Skip to content

Commit

Permalink
pageserver: reorder upload queue when possible
Browse files Browse the repository at this point in the history
  • Loading branch information
erikgrinaker committed Dec 30, 2024
1 parent d2ae0f6 commit c09b6a7
Show file tree
Hide file tree
Showing 3 changed files with 160 additions and 87 deletions.
109 changes: 34 additions & 75 deletions pageserver/src/tenant/remote_timeline_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1797,57 +1797,23 @@ impl RemoteTimelineClient {
Ok(())
}

///
/// Pick next tasks from the queue, and start as many of them as possible without violating
/// the ordering constraints.
///
/// The caller needs to already hold the `upload_queue` lock.
/// TODO: consider limiting the number of in-progress tasks, beyond what remote_storage does,
/// to avoid tenants starving other tenants.
fn launch_queued_tasks(self: &Arc<Self>, upload_queue: &mut UploadQueueInitialized) {
while let Some(next_op) = upload_queue.queued_operations.front() {
// Can we run this task now?
let can_run_now = match next_op {
UploadOp::UploadLayer(..) => {
// Can always be scheduled.
true
}
UploadOp::UploadMetadata { .. } => {
// These can only be performed after all the preceding operations
// have finished.
upload_queue.inprogress_tasks.is_empty()
}
UploadOp::Delete(..) => {
// Wait for preceding uploads to finish. Concurrent deletions are OK, though.
upload_queue.num_inprogress_deletions == upload_queue.inprogress_tasks.len()
}

UploadOp::Barrier(_) | UploadOp::Shutdown => {
upload_queue.inprogress_tasks.is_empty()
}
};

// If we cannot launch this task, don't look any further.
//
// In some cases, we could let some non-frontmost tasks to "jump the queue" and launch
// them now, but we don't try to do that currently. For example, if the frontmost task
// is an index-file upload that cannot proceed until preceding uploads have finished, we
// could still start layer uploads that were scheduled later.
if !can_run_now {
break;
}

if let UploadOp::Shutdown = next_op {
// leave the op in the queue but do not start more tasks; it will be dropped when
// the stop is called.
upload_queue.shutdown_ready.close();
break;
}

// We can launch this task. Remove it from the queue first.
let mut next_op = upload_queue.queued_operations.pop_front().unwrap();
// Check for a shutdown. Leave it in the queue, but don't start more tasks. It will be
// dropped on stop.
if let Some(UploadOp::Shutdown) = upload_queue.queued_operations.front() {
upload_queue.shutdown_ready.close();
return;
}

debug!("starting op: {}", next_op);
while let Some(mut next_op) = upload_queue.next_ready() {
debug!("starting op: {next_op}");

// Update the counters and prepare
// Prepare upload.
match &mut next_op {
UploadOp::UploadLayer(layer, meta, mode) => {
if upload_queue
Expand All @@ -1858,18 +1824,14 @@ impl RemoteTimelineClient {
} else {
*mode = Some(OpType::MayReorder)
}
upload_queue.num_inprogress_layer_uploads += 1;
}
UploadOp::UploadMetadata { .. } => {
upload_queue.num_inprogress_metadata_uploads += 1;
}
UploadOp::UploadMetadata { .. } => {}
UploadOp::Delete(Delete { layers }) => {
for (name, meta) in layers {
upload_queue
.recently_deleted
.insert((name.clone(), meta.generation));
}
upload_queue.num_inprogress_deletions += 1;
}
UploadOp::Barrier(sender) => {
sender.send_replace(());
Expand Down Expand Up @@ -1967,6 +1929,8 @@ impl RemoteTimelineClient {

let upload_result: anyhow::Result<()> = match &task.op {
UploadOp::UploadLayer(ref layer, ref layer_metadata, mode) => {
// TODO: check if this mechanism can be removed now that can_bypass() performs
// conflict checks during scheduling.
if let Some(OpType::FlushDeletion) = mode {
if self.config.read().unwrap().block_deletions {
// Of course, this is not efficient... but usually the queue should be empty.
Expand Down Expand Up @@ -2189,13 +2153,8 @@ impl RemoteTimelineClient {
upload_queue.inprogress_tasks.remove(&task.task_id);

let lsn_update = match task.op {
UploadOp::UploadLayer(_, _, _) => {
upload_queue.num_inprogress_layer_uploads -= 1;
None
}
UploadOp::UploadLayer(_, _, _) => None,
UploadOp::UploadMetadata { ref uploaded } => {
upload_queue.num_inprogress_metadata_uploads -= 1;

// the task id is reused as a monotonicity check for storing the "clean"
// IndexPart.
let last_updater = upload_queue.clean.1;
Expand Down Expand Up @@ -2229,10 +2188,7 @@ impl RemoteTimelineClient {
None
}
}
UploadOp::Delete(_) => {
upload_queue.num_inprogress_deletions -= 1;
None
}
UploadOp::Delete(_) => None,
UploadOp::Barrier(..) | UploadOp::Shutdown => unreachable!(),
};

Expand Down Expand Up @@ -2356,9 +2312,6 @@ impl RemoteTimelineClient {
visible_remote_consistent_lsn: initialized
.visible_remote_consistent_lsn
.clone(),
num_inprogress_layer_uploads: 0,
num_inprogress_metadata_uploads: 0,
num_inprogress_deletions: 0,
inprogress_tasks: HashMap::default(),
queued_operations: VecDeque::default(),
#[cfg(feature = "testing")]
Expand All @@ -2385,14 +2338,6 @@ impl RemoteTimelineClient {
}
};

// consistency check
assert_eq!(
qi.num_inprogress_layer_uploads
+ qi.num_inprogress_metadata_uploads
+ qi.num_inprogress_deletions,
qi.inprogress_tasks.len()
);

// We don't need to do anything here for in-progress tasks. They will finish
// on their own, decrement the unfinished-task counter themselves, and observe
// that the queue is Stopped.
Expand Down Expand Up @@ -2531,6 +2476,20 @@ pub fn remote_layer_path(
RemotePath::from_string(&path).expect("Failed to construct path")
}

/// Returns true if a and b have the same layer path within a tenant/timeline.
///
/// TODO: there should be a variant of LayerName for the physical path that contains information
/// about the shard and generation, such that this could be replaced by a simple comparison.
/// Effectively remote_layer_path(a) == remote_layer_path(b) but without the string allocations.
pub fn is_same_layer_path(
aname: &LayerName,
ameta: &LayerFileMetadata,
bname: &LayerName,
bmeta: &LayerFileMetadata,
) -> bool {
aname == bname && ameta.shard == bmeta.shard && ameta.generation == bmeta.generation
}

pub fn remote_initdb_archive_path(tenant_id: &TenantId, timeline_id: &TimelineId) -> RemotePath {
RemotePath::from_string(&format!(
"tenants/{tenant_id}/{TIMELINES_SEGMENT_NAME}/{timeline_id}/{INITDB_PATH}"
Expand Down Expand Up @@ -2824,8 +2783,8 @@ mod tests {
let mut guard = client.upload_queue.lock().unwrap();
let upload_queue = guard.initialized_mut().unwrap();
assert!(upload_queue.queued_operations.is_empty());
assert!(upload_queue.inprogress_tasks.len() == 2);
assert!(upload_queue.num_inprogress_layer_uploads == 2);
assert_eq!(upload_queue.inprogress_tasks.len(), 2);
assert_eq!(upload_queue.num_inprogress_layer_uploads(), 2);

// also check that `latest_file_changes` was updated
assert!(upload_queue.latest_files_changes_since_metadata_upload_scheduled == 2);
Expand Down Expand Up @@ -2895,8 +2854,8 @@ mod tests {
// Deletion schedules upload of the index file, and the file deletion itself
assert_eq!(upload_queue.queued_operations.len(), 2);
assert_eq!(upload_queue.inprogress_tasks.len(), 1);
assert_eq!(upload_queue.num_inprogress_layer_uploads, 1);
assert_eq!(upload_queue.num_inprogress_deletions, 0);
assert_eq!(upload_queue.num_inprogress_layer_uploads(), 1);
assert_eq!(upload_queue.num_inprogress_deletions(), 0);
assert_eq!(
upload_queue.latest_files_changes_since_metadata_upload_scheduled,
0
Expand Down
2 changes: 1 addition & 1 deletion pageserver/src/tenant/remote_timeline_client/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ impl IndexPart {
let Some(index_metadata) = self.layer_metadata.get(name) else {
return false;
};
metadata.shard == index_metadata.shard && metadata.generation == index_metadata.generation
super::is_same_layer_path(name, metadata, name, index_metadata)
}
}

Expand Down
136 changes: 125 additions & 11 deletions pageserver/src/tenant/upload_queue.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use super::remote_timeline_client::is_same_layer_path;
use super::storage_layer::AsLayerDesc as _;
use super::storage_layer::LayerName;
use super::storage_layer::ResidentLayer;
use crate::tenant::metadata::TimelineMetadata;
Expand Down Expand Up @@ -70,11 +72,6 @@ pub(crate) struct UploadQueueInitialized {
/// we skip validation)
pub(crate) visible_remote_consistent_lsn: Arc<AtomicLsn>,

// Breakdown of different kinds of tasks currently in-progress
pub(crate) num_inprogress_layer_uploads: usize,
pub(crate) num_inprogress_metadata_uploads: usize,
pub(crate) num_inprogress_deletions: usize,

/// Tasks that are currently in-progress. In-progress means that a tokio Task
/// has been launched for it. An in-progress task can be busy uploading, but it can
/// also be waiting on the `concurrency_limiter` Semaphore in S3Bucket, or it can
Expand Down Expand Up @@ -122,6 +119,76 @@ impl UploadQueueInitialized {
let lsn = self.clean.0.metadata.disk_consistent_lsn();
self.clean.1.map(|_| lsn)
}

/// Returns the number of in-progress deletion operations.
#[cfg(test)]
pub(crate) fn num_inprogress_deletions(&self) -> usize {
self.inprogress_tasks
.iter()
.filter(|(_, t)| matches!(t.op, UploadOp::Delete(_)))
.count()
}

/// Returns the number of in-progress layer uploads.
#[cfg(test)]
pub(crate) fn num_inprogress_layer_uploads(&self) -> usize {
self.inprogress_tasks
.iter()
.filter(|(_, t)| matches!(t.op, UploadOp::UploadLayer(_, _, _)))
.count()
}

/// Returns and removes the next ready operation from the queue, if any. This isn't necessarily
/// the first operation in the queue, to avoid head-of-line blocking -- an operation can jump
/// the queue if it doesn't conflict with operations ahead of it.
///
/// None may be returned even if the queue isn't empty, if no operations are ready yet.
pub(crate) fn next_ready(&mut self) -> Option<UploadOp> {
// TODO: this is quadratic. Is that a problem? Consider optimizing.
for (i, candidate) in self.queued_operations.iter().enumerate() {
// If this candidate is ready, go for it. Otherwise, try the next one.
if self.is_ready(i) {
return self.queued_operations.remove(i);
}

// Nothing can bypass a barrier or shutdown. If it wasn't scheduled above, give up.
if matches!(candidate, UploadOp::Barrier(_) | UploadOp::Shutdown) {
return None;
}
}
None
}

/// Returns true if the queued operation at the given position is ready to be uploaded, i.e. if
/// it doesn't conflict with any in-progress or queued operations ahead of it. Operations are
/// allowed to skip the queue when it's safe to do so, to increase parallelism.
///
/// The position must be valid for the queue size.
fn is_ready(&self, pos: usize) -> bool {
let candidate = self.queued_operations.get(pos).expect("invalid position");
self
// Look at in-progress operations, in random order.
.inprogress_tasks
.values()
.map(|task| &task.op)
// Then queued operations ahead of the candidate, front-to-back.
.chain(self.queued_operations.iter().take(pos))
// Keep track of the active index ahead of each operation. This is used to ensure that
// an upload doesn't skip the queue too far, such that it modifies a layer that's
// referenced by the active index.
//
// It's okay that in-progress operations are emitted in random order above, since at
// most one of them can be an index upload (enforced by can_bypass).
.scan(&self.clean.0, |next_active_index, op| {
let active_index = *next_active_index;
if let UploadOp::UploadMetadata { ref uploaded } = op {
*next_active_index = uploaded; // stash index for next operation after this
}
Some((op, active_index))
})
// Check if the candidate can bypass all of them.
.all(|(op, active_index)| candidate.can_bypass(op, active_index))
}
}

#[derive(Clone, Copy)]
Expand Down Expand Up @@ -185,9 +252,6 @@ impl UploadQueue {
visible_remote_consistent_lsn: Arc::new(AtomicLsn::new(0)),
// what follows are boring default initializations
task_counter: 0,
num_inprogress_layer_uploads: 0,
num_inprogress_metadata_uploads: 0,
num_inprogress_deletions: 0,
inprogress_tasks: HashMap::new(),
queued_operations: VecDeque::new(),
#[cfg(feature = "testing")]
Expand Down Expand Up @@ -227,9 +291,6 @@ impl UploadQueue {
),
// what follows are boring default initializations
task_counter: 0,
num_inprogress_layer_uploads: 0,
num_inprogress_metadata_uploads: 0,
num_inprogress_deletions: 0,
inprogress_tasks: HashMap::new(),
queued_operations: VecDeque::new(),
#[cfg(feature = "testing")]
Expand Down Expand Up @@ -338,3 +399,56 @@ impl std::fmt::Display for UploadOp {
}
}
}

impl UploadOp {
/// Returns true if self can bypass other, i.e. if the operations don't conflict. index is the
/// active index when other would be uploaded -- if we allow self to bypass other, this would
/// be the active index when self is uploaded.
pub fn can_bypass(&self, other: &UploadOp, index: &IndexPart) -> bool {
match (self, other) {
// Nothing can bypass a barrier or shutdown, and it can't bypass anything.
(UploadOp::Barrier(_), _) | (_, UploadOp::Barrier(_)) => false,
(UploadOp::Shutdown, _) | (_, UploadOp::Shutdown) => false,

// Uploads and deletes can bypass each other unless they're for the same file.
(UploadOp::UploadLayer(a, ameta, _), UploadOp::UploadLayer(b, bmeta, _)) => {
let aname = &a.layer_desc().layer_name();
let bname = &b.layer_desc().layer_name();
!is_same_layer_path(aname, ameta, bname, bmeta)
}
(UploadOp::UploadLayer(u, umeta, _), UploadOp::Delete(d))
| (UploadOp::Delete(d), UploadOp::UploadLayer(u, umeta, _)) => {
d.layers.iter().all(|(dname, dmeta)| {
!is_same_layer_path(&u.layer_desc().layer_name(), umeta, dname, dmeta)
})
}

// Deletes are idempotent and can always bypass each other.
(UploadOp::Delete(_), UploadOp::Delete(_)) => true,

// Uploads and deletes can bypass an index upload as long as neither the uploaded index
// nor the active index below it references the file. A layer can't be modified or
// deleted while referenced by an index.
//
// Similarly, index uploads can bypass uploads and deletes as long as neither the
// uploaded index nor the active index references the file (the latter would be
// incorrect use by the caller).
(UploadOp::UploadLayer(u, umeta, _), UploadOp::UploadMetadata { uploaded: i })
| (UploadOp::UploadMetadata { uploaded: i }, UploadOp::UploadLayer(u, umeta, _)) => {
let uname = u.layer_desc().layer_name();
!i.references(&uname, umeta) && !index.references(&uname, umeta)
}
(UploadOp::Delete(d), UploadOp::UploadMetadata { uploaded: i })
| (UploadOp::UploadMetadata { uploaded: i }, UploadOp::Delete(d)) => {
d.layers.iter().all(|(dname, dmeta)| {
!i.references(dname, dmeta) && !index.references(dname, dmeta)
})
}

// Indexes can never bypass each other.
// TODO: we could coalesce them though, by only uploading the newest ready index. This
// is left for later, out of caution.
(UploadOp::UploadMetadata { .. }, UploadOp::UploadMetadata { .. }) => false,
}
}
}

0 comments on commit c09b6a7

Please sign in to comment.