diff --git a/all-is-cubes-mesh/src/dynamic/blocks.rs b/all-is-cubes-mesh/src/dynamic/blocks.rs index 1e11262b3..6d4c89e41 100644 --- a/all-is-cubes-mesh/src/dynamic/blocks.rs +++ b/all-is-cubes-mesh/src/dynamic/blocks.rs @@ -143,7 +143,12 @@ where ); if defer { - vbm.spawn_update_job(evaluated.clone(), mesh_options.clone(), &self.jobs); + vbm.spawn_update_job( + index, + evaluated.clone(), + mesh_options.clone(), + &self.jobs, + ); } self.meshes.push(vbm); @@ -167,6 +172,7 @@ where // Updated the texture in-place. No need for mesh updates. } else { current_mesh_entry.spawn_update_job( + block_index, new_evaluated_block.clone(), mesh_options.clone(), &self.jobs, @@ -178,9 +184,9 @@ where let mut completed_job_stats = TimeStats::default(); let mut callback_stats = TimeStats::default(); let process_completed_jobs = || { - // TODO: Instead of scanning everything, have the job queue notify us which ones - // have completed jobs. - for (block_index, current_mesh_entry) in self.meshes.iter_mut().enumerate() { + for block_index in self.jobs.take_completed() { + let uindex = usize::from(block_index); + let current_mesh_entry = &mut self.meshes[uindex]; match current_mesh_entry.try_recv_update() { Some(Ok(job::CompletedMeshJob { mesh: new_block_mesh, @@ -188,8 +194,7 @@ where })) => { completed_job_stats += TimeStats::one(compute_time); - let new_evaluated_block: &EvaluatedBlock = - block_data[block_index].evaluated(); + let new_evaluated_block: &EvaluatedBlock = block_data[uindex].evaluated(); // Only invalidate the chunks if we actually have different data. // Note: This comparison depends on such things as the definition of @@ -205,7 +210,7 @@ where // TODO: reuse old render data let start_callback_time = M::Instant::now(); *current_mesh_entry = VersionedBlockMesh::new( - block_index as BlockIndex, + block_index, new_evaluated_block, new_block_mesh, current_version_number, @@ -226,7 +231,8 @@ where // If the job was cancelled, reschedule it. Some(Err(Canceled)) => current_mesh_entry.spawn_update_job( - block_data[block_index].evaluated().clone(), + block_index, + block_data[uindex].evaluated().clone(), mesh_options.clone(), &self.jobs, ), @@ -374,11 +380,12 @@ impl VersionedBlockMesh { fn spawn_update_job( &mut self, + block_index: BlockIndex, block: EvaluatedBlock, mesh_options: MeshOptions, jobs: &job::QueueOwner, ) { - let response_receiver = jobs.send(block, mesh_options); + let response_receiver = jobs.send(block_index, block, mesh_options); let old_job = self.pending_latest.replace(response_receiver); diff --git a/all-is-cubes-mesh/src/dynamic/job.rs b/all-is-cubes-mesh/src/dynamic/job.rs index 1a0aa9705..a8d92f233 100644 --- a/all-is-cubes-mesh/src/dynamic/job.rs +++ b/all-is-cubes-mesh/src/dynamic/job.rs @@ -7,6 +7,7 @@ use futures_channel::oneshot; use futures_util::FutureExt as _; use all_is_cubes::block::EvaluatedBlock; +use all_is_cubes::space::BlockIndex; use all_is_cubes::time::{self, Instant}; use all_is_cubes::util::{Refmt as _, StatusText}; @@ -163,6 +164,7 @@ impl QueueOwner { pub(in crate::dynamic) fn send( &self, + block_index: BlockIndex, block: EvaluatedBlock, mesh_options: MeshOptions, ) -> Receiver> { @@ -172,12 +174,21 @@ impl QueueOwner { block, mesh_options, response: response_sender, - counter_ticket: state::Ticket::new(self.counters.clone(), state::State::Queued), + counter_ticket: state::Ticket::new( + self.counters.clone(), + block_index, + state::State::Queued, + ), }) .expect("job queue should never be defunct or full"); Receiver { receiver } } + /// Returns all block indices which have had a job complete since the last time this was called. + pub(crate) fn take_completed(&self) -> Vec { + self.counters.take_completed() + } + /// Run jobs in the queue, wait for jobs to complete, or both, /// until the deadline is past or there is no work remaining. /// @@ -289,9 +300,9 @@ pub(in crate::dynamic) struct CompletedMeshJob { } mod state { + use super::*; + use std::collections::HashSet; use std::sync::{Arc, Condvar, Mutex}; - #[allow(unused_imports)] // conditionally used - use std::time::Duration; /// Shared structure recording how many jobs are in a given state. /// Fields correspond to variants of [`JobState`]. @@ -300,6 +311,8 @@ mod state { queued: usize, taken: usize, completed: usize, + + completed_ids: HashSet, } impl InnerCounters { @@ -326,6 +339,7 @@ mod state { queued: 0, taken: 0, completed: 0, + completed_ids: HashSet::new(), }), endings: Condvar::new(), } @@ -351,6 +365,15 @@ mod state { } } } + + pub fn take_completed(&self) -> Vec { + self.counters + .lock() + .unwrap() + .completed_ids + .drain() + .collect() + } } #[derive(Clone, Copy, Debug, Eq, PartialEq)] @@ -363,21 +386,24 @@ mod state { /// A handle on one count in [`JobStateCounters`], which makes sure to decrement /// on change or drop. pub(super) struct Ticket { + /// TODO: When we have background chunk meshing too, this will need to become a more general + /// "JobId" enum type + block_index: BlockIndex, current_state: State, shared: Arc, } impl Ticket { - pub fn new(shared: Arc, current_state: State) -> Self { + pub fn new(shared: Arc, block_index: BlockIndex, current_state: State) -> Self { + // avoid complexity by not supporting this case + assert!(current_state != State::Completed); + { let counters = &mut *shared.counters.lock().unwrap(); *counters.field(current_state) += 1; - - if current_state == State::Completed { - shared.endings.notify_all(); - } } Self { + block_index, current_state, shared, } @@ -387,6 +413,8 @@ mod state { if new_state == self.current_state { return; } + assert!(self.current_state != State::Completed); + let old_state = self.current_state; let counters = &mut *self.shared.counters.lock().unwrap(); @@ -396,6 +424,7 @@ mod state { self.current_state = new_state; if new_state == State::Completed { + counters.completed_ids.insert(self.block_index); self.shared.endings.notify_all(); } }