From e58e29e63994373e5645a5123e0be191bf693813 Mon Sep 17 00:00:00 2001 From: Erik Grinaker Date: Tue, 14 Jan 2025 19:01:14 +0100 Subject: [PATCH] pageserver: limit number of upload queue tasks (#10384) ## Problem The upload queue can currently schedule an arbitrary number of tasks. This can both spawn an unbounded number of Tokio tasks, and also significantly slow down upload queue scheduling as it's quadratic in number of operations. Touches #10096. ## Summary of changes Limit the number of inprogress tasks to the remote storage upload concurrency. While this concurrency limit is shared across all tenants, there's certainly no point in scheduling more than this -- we could even consider setting the limit lower, but don't for now to avoid artificially constraining tenants. --- libs/remote_storage/src/config.rs | 11 +++ pageserver/benches/upload_queue.rs | 2 +- .../src/tenant/remote_timeline_client.rs | 29 +++++++- pageserver/src/tenant/upload_queue.rs | 73 ++++++++++++++++--- 4 files changed, 99 insertions(+), 16 deletions(-) diff --git a/libs/remote_storage/src/config.rs b/libs/remote_storage/src/config.rs index 49b1d9dc874b..dae141bf7710 100644 --- a/libs/remote_storage/src/config.rs +++ b/libs/remote_storage/src/config.rs @@ -43,6 +43,17 @@ impl RemoteStorageKind { } } +impl RemoteStorageConfig { + /// Helper to fetch the configured concurrency limit. + pub fn concurrency_limit(&self) -> Option { + match &self.storage { + RemoteStorageKind::LocalFs { .. } => None, + RemoteStorageKind::AwsS3(c) => Some(c.concurrency_limit.into()), + RemoteStorageKind::AzureContainer(c) => Some(c.concurrency_limit.into()), + } + } +} + fn default_timeout() -> Duration { RemoteStorageConfig::DEFAULT_TIMEOUT } diff --git a/pageserver/benches/upload_queue.rs b/pageserver/benches/upload_queue.rs index 528b3d54907a..ed644b0e3cd5 100644 --- a/pageserver/benches/upload_queue.rs +++ b/pageserver/benches/upload_queue.rs @@ -53,7 +53,7 @@ fn bench_upload_queue_next_ready(c: &mut Criterion) { // Construct the queue. let mut queue = UploadQueue::Uninitialized; - let queue = queue.initialize_with_current_remote_index_part(&index)?; + let queue = queue.initialize_with_current_remote_index_part(&index, 0)?; // Populate inprogress_tasks with a bunch of layer1 deletions. let delete = UploadOp::Delete(Delete { diff --git a/pageserver/src/tenant/remote_timeline_client.rs b/pageserver/src/tenant/remote_timeline_client.rs index 75e8da496d83..160276558561 100644 --- a/pageserver/src/tenant/remote_timeline_client.rs +++ b/pageserver/src/tenant/remote_timeline_client.rs @@ -425,8 +425,16 @@ impl RemoteTimelineClient { /// an index file upload, i.e., it's not empty. /// The given `index_part` must be the one on the remote. pub fn init_upload_queue(&self, index_part: &IndexPart) -> anyhow::Result<()> { + // Set the maximum number of inprogress tasks to the remote storage concurrency. There's + // certainly no point in starting more upload tasks than this. + let inprogress_limit = self + .conf + .remote_storage_config + .as_ref() + .and_then(|r| r.concurrency_limit()) + .unwrap_or(0); let mut upload_queue = self.upload_queue.lock().unwrap(); - upload_queue.initialize_with_current_remote_index_part(index_part)?; + upload_queue.initialize_with_current_remote_index_part(index_part, inprogress_limit)?; self.update_remote_physical_size_gauge(Some(index_part)); info!( "initialized upload queue from remote index with {} layer files", @@ -441,8 +449,16 @@ impl RemoteTimelineClient { &self, local_metadata: &TimelineMetadata, ) -> anyhow::Result<()> { + // Set the maximum number of inprogress tasks to the remote storage concurrency. There's + // certainly no point in starting more upload tasks than this. + let inprogress_limit = self + .conf + .remote_storage_config + .as_ref() + .and_then(|r| r.concurrency_limit()) + .unwrap_or(0); let mut upload_queue = self.upload_queue.lock().unwrap(); - upload_queue.initialize_empty_remote(local_metadata)?; + upload_queue.initialize_empty_remote(local_metadata, inprogress_limit)?; self.update_remote_physical_size_gauge(None); info!("initialized upload queue as empty"); Ok(()) @@ -458,9 +474,15 @@ impl RemoteTimelineClient { let deleted_at = index_part.deleted_at.ok_or(anyhow::anyhow!( "bug: it is responsibility of the caller to provide index part from MaybeDeletedIndexPart::Deleted" ))?; + let inprogress_limit = self + .conf + .remote_storage_config + .as_ref() + .and_then(|r| r.concurrency_limit()) + .unwrap_or(0); let mut upload_queue = self.upload_queue.lock().unwrap(); - upload_queue.initialize_with_current_remote_index_part(index_part)?; + upload_queue.initialize_with_current_remote_index_part(index_part, inprogress_limit)?; self.update_remote_physical_size_gauge(Some(index_part)); self.stop_impl(&mut upload_queue); @@ -2355,6 +2377,7 @@ impl RemoteTimelineClient { // but for this use case it doesnt really makes sense to bring unsafe code only for this usage point. // Deletion is not really perf sensitive so there shouldnt be any problems with cloning a fraction of it. let upload_queue_for_deletion = UploadQueueInitialized { + inprogress_limit: initialized.inprogress_limit, task_counter: 0, dirty: initialized.dirty.clone(), clean: initialized.clean.clone(), diff --git a/pageserver/src/tenant/upload_queue.rs b/pageserver/src/tenant/upload_queue.rs index bd524e815344..09c8f6ad8c1a 100644 --- a/pageserver/src/tenant/upload_queue.rs +++ b/pageserver/src/tenant/upload_queue.rs @@ -51,6 +51,9 @@ pub enum OpType { /// This keeps track of queued and in-progress tasks. pub struct UploadQueueInitialized { + /// Maximum number of inprogress tasks to schedule. 0 is no limit. + pub(crate) inprogress_limit: usize, + /// Counter to assign task IDs pub(crate) task_counter: u64, @@ -128,8 +131,14 @@ impl UploadQueueInitialized { /// the queue if it doesn't conflict with operations ahead of it. /// /// None may be returned even if the queue isn't empty, if no operations are ready yet. + /// + /// NB: this is quadratic, but queues are expected to be small, and bounded by inprogress_limit. pub fn next_ready(&mut self) -> Option { - // NB: this is quadratic, but queues are expected to be small. + // If inprogress_tasks is already at limit, don't schedule anything more. + if self.inprogress_limit > 0 && self.inprogress_tasks.len() >= self.inprogress_limit { + return None; + } + for (i, candidate) in self.queued_operations.iter().enumerate() { // If this candidate is ready, go for it. Otherwise, try the next one. if self.is_ready(i) { @@ -289,6 +298,7 @@ impl UploadQueue { pub fn initialize_empty_remote( &mut self, metadata: &TimelineMetadata, + inprogress_limit: usize, ) -> anyhow::Result<&mut UploadQueueInitialized> { match self { UploadQueue::Uninitialized => (), @@ -302,6 +312,7 @@ impl UploadQueue { let index_part = IndexPart::empty(metadata.clone()); let state = UploadQueueInitialized { + inprogress_limit, dirty: index_part.clone(), clean: (index_part, None), latest_files_changes_since_metadata_upload_scheduled: 0, @@ -325,6 +336,7 @@ impl UploadQueue { pub fn initialize_with_current_remote_index_part( &mut self, index_part: &IndexPart, + inprogress_limit: usize, ) -> anyhow::Result<&mut UploadQueueInitialized> { match self { UploadQueue::Uninitialized => (), @@ -339,6 +351,7 @@ impl UploadQueue { ); let state = UploadQueueInitialized { + inprogress_limit, dirty: index_part.clone(), clean: (index_part.clone(), None), latest_files_changes_since_metadata_upload_scheduled: 0, @@ -633,7 +646,7 @@ mod tests { #[test] fn schedule_barrier() -> anyhow::Result<()> { let mut queue = UploadQueue::Uninitialized; - let queue = queue.initialize_empty_remote(&TimelineMetadata::example())?; + let queue = queue.initialize_empty_remote(&TimelineMetadata::example(), 0)?; let tli = make_timeline(); let index = Box::new(queue.clean.0.clone()); // empty, doesn't matter @@ -700,7 +713,7 @@ mod tests { #[test] fn schedule_delete_parallel() -> anyhow::Result<()> { let mut queue = UploadQueue::Uninitialized; - let queue = queue.initialize_empty_remote(&TimelineMetadata::example())?; + let queue = queue.initialize_empty_remote(&TimelineMetadata::example(), 0)?; let tli = make_timeline(); // Enqueue a bunch of deletes, some with conflicting names. @@ -745,7 +758,7 @@ mod tests { #[test] fn schedule_upload_conflicts() -> anyhow::Result<()> { let mut queue = UploadQueue::Uninitialized; - let queue = queue.initialize_with_current_remote_index_part(&IndexPart::example())?; + let queue = queue.initialize_with_current_remote_index_part(&IndexPart::example(), 0)?; let tli = make_timeline(); // Enqueue three versions of the same layer, with different file sizes. @@ -778,7 +791,7 @@ mod tests { #[test] fn schedule_upload_delete_conflicts() -> anyhow::Result<()> { let mut queue = UploadQueue::Uninitialized; - let queue = queue.initialize_with_current_remote_index_part(&IndexPart::example())?; + let queue = queue.initialize_with_current_remote_index_part(&IndexPart::example(), 0)?; let tli = make_timeline(); // Enqueue two layer uploads, with a delete of both layers in between them. These should be @@ -817,7 +830,7 @@ mod tests { #[test] fn schedule_upload_delete_conflicts_bypass() -> anyhow::Result<()> { let mut queue = UploadQueue::Uninitialized; - let queue = queue.initialize_with_current_remote_index_part(&IndexPart::example())?; + let queue = queue.initialize_with_current_remote_index_part(&IndexPart::example(), 0)?; let tli = make_timeline(); // Enqueue two layer uploads, with a delete of both layers in between them. These should be @@ -859,7 +872,7 @@ mod tests { #[test] fn schedule_upload_parallel() -> anyhow::Result<()> { let mut queue = UploadQueue::Uninitialized; - let queue = queue.initialize_with_current_remote_index_part(&IndexPart::example())?; + let queue = queue.initialize_with_current_remote_index_part(&IndexPart::example(), 0)?; let tli = make_timeline(); // Enqueue three different layer uploads. @@ -888,7 +901,7 @@ mod tests { #[test] fn schedule_index_serial() -> anyhow::Result<()> { let mut queue = UploadQueue::Uninitialized; - let queue = queue.initialize_with_current_remote_index_part(&IndexPart::example())?; + let queue = queue.initialize_with_current_remote_index_part(&IndexPart::example(), 0)?; // Enqueue three uploads of the current empty index. let index = Box::new(queue.clean.0.clone()); @@ -925,7 +938,7 @@ mod tests { #[test] fn schedule_index_upload_chain() -> anyhow::Result<()> { let mut queue = UploadQueue::Uninitialized; - let queue = queue.initialize_with_current_remote_index_part(&IndexPart::example())?; + let queue = queue.initialize_with_current_remote_index_part(&IndexPart::example(), 0)?; let tli = make_timeline(); // Enqueue three uploads of the current empty index. @@ -994,7 +1007,7 @@ mod tests { #[test] fn schedule_index_delete_dereferenced() -> anyhow::Result<()> { let mut queue = UploadQueue::Uninitialized; - let queue = queue.initialize_with_current_remote_index_part(&IndexPart::example())?; + let queue = queue.initialize_with_current_remote_index_part(&IndexPart::example(), 0)?; let tli = make_timeline(); // Create a layer to upload. @@ -1038,7 +1051,7 @@ mod tests { #[test] fn schedule_index_upload_dereferenced() -> anyhow::Result<()> { let mut queue = UploadQueue::Uninitialized; - let queue = queue.initialize_with_current_remote_index_part(&IndexPart::example())?; + let queue = queue.initialize_with_current_remote_index_part(&IndexPart::example(), 0)?; let tli = make_timeline(); // Create a layer to upload. @@ -1085,7 +1098,7 @@ mod tests { #[test] fn schedule_shutdown() -> anyhow::Result<()> { let mut queue = UploadQueue::Uninitialized; - let queue = queue.initialize_empty_remote(&TimelineMetadata::example())?; + let queue = queue.initialize_empty_remote(&TimelineMetadata::example(), 0)?; let tli = make_timeline(); let index = Box::new(queue.clean.0.clone()); // empty, doesn't matter @@ -1139,6 +1152,42 @@ mod tests { Ok(()) } + /// Scheduling respects inprogress_limit. + #[test] + fn schedule_inprogress_limit() -> anyhow::Result<()> { + // Create a queue with inprogress_limit=2. + let mut queue = UploadQueue::Uninitialized; + let queue = queue.initialize_empty_remote(&TimelineMetadata::example(), 2)?; + let tli = make_timeline(); + + // Enqueue a bunch of uploads. + let layer0 = make_layer(&tli, "000000000000000000000000000000000000-100000000000000000000000000000000000__00000000016B59D8-00000000016B5A51"); + let layer1 = make_layer(&tli, "100000000000000000000000000000000000-200000000000000000000000000000000000__00000000016B59D8-00000000016B5A51"); + let layer2 = make_layer(&tli, "200000000000000000000000000000000000-300000000000000000000000000000000000__00000000016B59D8-00000000016B5A51"); + let layer3 = make_layer(&tli, "300000000000000000000000000000000000-400000000000000000000000000000000000__00000000016B59D8-00000000016B5A51"); + + let ops = [ + UploadOp::UploadLayer(layer0.clone(), layer0.metadata(), None), + UploadOp::UploadLayer(layer1.clone(), layer1.metadata(), None), + UploadOp::UploadLayer(layer2.clone(), layer2.metadata(), None), + UploadOp::UploadLayer(layer3.clone(), layer3.metadata(), None), + ]; + + queue.queued_operations.extend(ops.clone()); + + // Schedule all ready operations. Only 2 are scheduled. + let tasks = queue.schedule_ready(); + assert_same_ops(tasks.iter().map(|t| &t.op), &ops[0..2]); + assert!(queue.next_ready().is_none()); + + // When one completes, another is scheduled. + queue.complete(tasks[0].task_id); + let tasks = queue.schedule_ready(); + assert_same_ops(tasks.iter().map(|t| &t.op), &ops[2..3]); + + Ok(()) + } + /// Tests that can_bypass takes name, generation and shard index into account for all operations. #[test] fn can_bypass_path() -> anyhow::Result<()> {