Skip to content

Commit

Permalink
mesh: Track completed jobs instead of scanning all blocks.
Browse files Browse the repository at this point in the history
  • Loading branch information
kpreid committed Feb 18, 2024
1 parent b6917ba commit 4de77cf
Show file tree
Hide file tree
Showing 2 changed files with 53 additions and 17 deletions.
25 changes: 16 additions & 9 deletions all-is-cubes-mesh/src/dynamic/blocks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,12 @@ where
);

if defer {
vbm.spawn_update_job(evaluated.clone(), mesh_options.clone(), &self.jobs);
vbm.spawn_update_job(
index,
evaluated.clone(),
mesh_options.clone(),
&self.jobs,
);
}

self.meshes.push(vbm);
Expand All @@ -167,6 +172,7 @@ where
// Updated the texture in-place. No need for mesh updates.
} else {
current_mesh_entry.spawn_update_job(
block_index,
new_evaluated_block.clone(),
mesh_options.clone(),
&self.jobs,
Expand All @@ -178,18 +184,17 @@ where
let mut completed_job_stats = TimeStats::default();
let mut callback_stats = TimeStats::default();
let process_completed_jobs = || {
// TODO: Instead of scanning everything, have the job queue notify us which ones
// have completed jobs.
for (block_index, current_mesh_entry) in self.meshes.iter_mut().enumerate() {
for block_index in self.jobs.take_completed() {
let uindex = usize::from(block_index);
let current_mesh_entry = &mut self.meshes[uindex];
match current_mesh_entry.try_recv_update() {
Some(Ok(job::CompletedMeshJob {
mesh: new_block_mesh,
compute_time,
})) => {
completed_job_stats += TimeStats::one(compute_time);

let new_evaluated_block: &EvaluatedBlock =
block_data[block_index].evaluated();
let new_evaluated_block: &EvaluatedBlock = block_data[uindex].evaluated();

// Only invalidate the chunks if we actually have different data.
// Note: This comparison depends on such things as the definition of
Expand All @@ -205,7 +210,7 @@ where
// TODO: reuse old render data
let start_callback_time = M::Instant::now();
*current_mesh_entry = VersionedBlockMesh::new(
block_index as BlockIndex,
block_index,
new_evaluated_block,
new_block_mesh,
current_version_number,
Expand All @@ -226,7 +231,8 @@ where

// If the job was cancelled, reschedule it.
Some(Err(Canceled)) => current_mesh_entry.spawn_update_job(
block_data[block_index].evaluated().clone(),
block_index,
block_data[uindex].evaluated().clone(),
mesh_options.clone(),
&self.jobs,
),
Expand Down Expand Up @@ -374,11 +380,12 @@ impl<M: DynamicMeshTypes> VersionedBlockMesh<M> {

fn spawn_update_job(
&mut self,
block_index: BlockIndex,
block: EvaluatedBlock,
mesh_options: MeshOptions,
jobs: &job::QueueOwner<M>,
) {
let response_receiver = jobs.send(block, mesh_options);
let response_receiver = jobs.send(block_index, block, mesh_options);

let old_job = self.pending_latest.replace(response_receiver);

Expand Down
45 changes: 37 additions & 8 deletions all-is-cubes-mesh/src/dynamic/job.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use futures_channel::oneshot;
use futures_util::FutureExt as _;

use all_is_cubes::block::EvaluatedBlock;
use all_is_cubes::space::BlockIndex;
use all_is_cubes::time::{self, Instant};
use all_is_cubes::util::{Refmt as _, StatusText};

Expand Down Expand Up @@ -163,6 +164,7 @@ impl<M: DynamicMeshTypes> QueueOwner<M> {

pub(in crate::dynamic) fn send(
&self,
block_index: BlockIndex,
block: EvaluatedBlock,
mesh_options: MeshOptions,
) -> Receiver<CompletedMeshJob<M>> {
Expand All @@ -172,12 +174,21 @@ impl<M: DynamicMeshTypes> QueueOwner<M> {
block,
mesh_options,
response: response_sender,
counter_ticket: state::Ticket::new(self.counters.clone(), state::State::Queued),
counter_ticket: state::Ticket::new(
self.counters.clone(),
block_index,
state::State::Queued,
),
})
.expect("job queue should never be defunct or full");
Receiver { receiver }
}

/// Returns all block indices which have had a job complete since the last time this was called.
pub(crate) fn take_completed(&self) -> Vec<BlockIndex> {
self.counters.take_completed()
}

/// Run jobs in the queue, wait for jobs to complete, or both,
/// until the deadline is past or there is no work remaining.
///
Expand Down Expand Up @@ -289,9 +300,9 @@ pub(in crate::dynamic) struct CompletedMeshJob<M: DynamicMeshTypes> {
}

mod state {
use super::*;
use std::collections::HashSet;
use std::sync::{Arc, Condvar, Mutex};
#[allow(unused_imports)] // conditionally used
use std::time::Duration;

/// Shared structure recording how many jobs are in a given state.
/// Fields correspond to variants of [`JobState`].
Expand All @@ -300,6 +311,8 @@ mod state {
queued: usize,
taken: usize,
completed: usize,

completed_ids: HashSet<BlockIndex>,
}

impl InnerCounters {
Expand All @@ -326,6 +339,7 @@ mod state {
queued: 0,
taken: 0,
completed: 0,
completed_ids: HashSet::new(),
}),
endings: Condvar::new(),
}
Expand All @@ -351,6 +365,15 @@ mod state {
}
}
}

pub fn take_completed(&self) -> Vec<BlockIndex> {
self.counters
.lock()
.unwrap()
.completed_ids
.drain()
.collect()
}
}

#[derive(Clone, Copy, Debug, Eq, PartialEq)]
Expand All @@ -363,21 +386,24 @@ mod state {
/// A handle on one count in [`JobStateCounters`], which makes sure to decrement
/// on change or drop.
pub(super) struct Ticket {
/// TODO: When we have background chunk meshing too, this will need to become a more general
/// "JobId" enum type
block_index: BlockIndex,
current_state: State,
shared: Arc<Counters>,
}
impl Ticket {
pub fn new(shared: Arc<Counters>, current_state: State) -> Self {
pub fn new(shared: Arc<Counters>, block_index: BlockIndex, current_state: State) -> Self {
// avoid complexity by not supporting this case
assert!(current_state != State::Completed);

{
let counters = &mut *shared.counters.lock().unwrap();
*counters.field(current_state) += 1;

if current_state == State::Completed {
shared.endings.notify_all();
}
}

Self {
block_index,
current_state,
shared,
}
Expand All @@ -387,6 +413,8 @@ mod state {
if new_state == self.current_state {
return;
}
assert!(self.current_state != State::Completed);

let old_state = self.current_state;

let counters = &mut *self.shared.counters.lock().unwrap();
Expand All @@ -396,6 +424,7 @@ mod state {
self.current_state = new_state;

if new_state == State::Completed {
counters.completed_ids.insert(self.block_index);
self.shared.endings.notify_all();
}
}
Expand Down

0 comments on commit 4de77cf

Please sign in to comment.