Skip to content

Commit

Permalink
Use EnumMap for requested goals
Browse files Browse the repository at this point in the history
  • Loading branch information
wks committed Feb 8, 2024
1 parent 7a2074e commit eb1d8df
Show file tree
Hide file tree
Showing 4 changed files with 103 additions and 104 deletions.
6 changes: 6 additions & 0 deletions src/global_state.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::sync::Mutex;
use std::time::Instant;

use atomic_refcell::AtomicRefCell;

/// This stores some global states for an MMTK instance.
/// Some MMTK components like plans and allocators may keep an reference to the struct, and can access it.
Expand All @@ -15,6 +18,8 @@ pub struct GlobalState {
pub(crate) initialized: AtomicBool,
/// The current GC status.
pub(crate) gc_status: Mutex<GcStatus>,
/// When did the last GC start? Only accessed by the last parked worker.
pub(crate) gc_start_time: AtomicRefCell<Option<Instant>>,
/// Is the current GC an emergency collection? Emergency means we may run out of memory soon, and we should
/// attempt to collect as much as we can.
pub(crate) emergency_collection: AtomicBool,
Expand Down Expand Up @@ -195,6 +200,7 @@ impl Default for GlobalState {
Self {
initialized: AtomicBool::new(false),
gc_status: Mutex::new(GcStatus::NotInGC),
gc_start_time: AtomicRefCell::new(None),
stacks_prepared: AtomicBool::new(false),
emergency_collection: AtomicBool::new(false),
user_triggered_collection: AtomicBool::new(false),
Expand Down
77 changes: 43 additions & 34 deletions src/scheduler/scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,8 +99,7 @@ impl<VM: VMBinding> GCWorkScheduler<VM> {
self.worker_group.prepare_surrender_buffer();

debug!("A mutator is requesting GC threads to stop for forking...");
self.worker_monitor
.make_request(|requests| requests.stop_for_fork.set());
self.worker_monitor.make_request(WorkerGoal::StopForFork);
}

/// Surrender the `GCWorker` struct of a GC worker when it exits.
Expand Down Expand Up @@ -130,8 +129,7 @@ impl<VM: VMBinding> GCWorkScheduler<VM> {
/// Request a GC to be scheduled. Called by mutator via `GCRequester`.
pub(crate) fn request_schedule_collection(&self) {
debug!("A mutator is sending GC-scheduling request to workers...");
self.worker_monitor
.make_request(|requests| requests.gc.set());
self.worker_monitor.make_request(WorkerGoal::Gc);
}

/// Add the `ScheduleCollection` packet. Called by the last parked worker.
Expand Down Expand Up @@ -417,19 +415,19 @@ impl<VM: VMBinding> GCWorkScheduler<VM> {
/// Called when the last worker parked.
/// `should_schedule_gc` is true if a mutator requested a GC.
fn on_last_parked(&self, worker: &GCWorker<VM>, goals: &mut WorkerGoals) -> LastParkedResult {
let Some(ref current_goal) = goals.current else {
let Some(ref current_goal) = goals.current() else {
// There is no goal. Find a request to respond to.
return self.respond_to_requests(goals);
return self.respond_to_requests(worker, goals);
};

match current_goal {
WorkerGoal::Gc { start_time } => {
WorkerGoal::Gc => {
// We are in the progress of GC.

// In stop-the-world GC, mutators cannot request for GC while GC is in progress.
// When we support concurrent GC, we should remove this assertion.
assert!(
!goals.requests.gc.debug_is_set(),
!goals.debug_is_requested(WorkerGoal::Gc),
"GC request sent to WorkerMonitor while GC is still in progress."
);

Expand All @@ -446,11 +444,11 @@ impl<VM: VMBinding> GCWorkScheduler<VM> {
LastParkedResult::WakeAll
} else {
// GC finished.
self.on_gc_finished(worker, start_time);
self.on_gc_finished(worker);

// Clear the current goal
goals.current = None;
self.respond_to_requests(goals)
goals.on_current_goal_completed();
self.respond_to_requests(worker, goals)
}
}
WorkerGoal::StopForFork => {
Expand All @@ -461,33 +459,40 @@ impl<VM: VMBinding> GCWorkScheduler<VM> {
}

/// Respond to a worker reqeust.
fn respond_to_requests(&self, goals: &mut WorkerGoals) -> LastParkedResult {
assert!(goals.current.is_none());

if goals.requests.gc.poll() {
trace!("A mutator requested a GC to be scheduled.");

// We set the eBPF trace point here so that bpftrace scripts can start recording work
// packet events before the `ScheduleCollection` work packet starts.
probe!(mmtk, gc_start);
fn respond_to_requests(
&self,
worker: &GCWorker<VM>,
goals: &mut WorkerGoals,
) -> LastParkedResult {
assert!(goals.current().is_none());

let Some(goal) = goals.poll_next_goal() else {
// No reqeusts. Park this worker, too.
return LastParkedResult::ParkSelf;
};

goals.current = Some(WorkerGoal::Gc {
start_time: Instant::now(),
});
match goal {
WorkerGoal::Gc => {
trace!("A mutator requested a GC to be scheduled.");

self.add_schedule_collection_packet();
return LastParkedResult::WakeSelf;
}
// We set the eBPF trace point here so that bpftrace scripts can start recording
// work packet events before the `ScheduleCollection` work packet starts.
probe!(mmtk, gc_start);

if goals.requests.stop_for_fork.poll() {
trace!("A mutator wanted to fork.");
{
let mut gc_start_time = worker.mmtk.state.gc_start_time.borrow_mut();
assert!(gc_start_time.is_none(), "GC already started?");
*gc_start_time = Some(Instant::now());
}

goals.current = Some(WorkerGoal::StopForFork);
return LastParkedResult::WakeAll;
self.add_schedule_collection_packet();
LastParkedResult::WakeSelf
}
WorkerGoal::StopForFork => {
trace!("A mutator wanted to fork.");
LastParkedResult::WakeAll
}
}

// No reqeusts. Park this worker, too.
LastParkedResult::ParkSelf
}

/// Find more work for workers to do. Return true if more work is available.
Expand Down Expand Up @@ -515,7 +520,7 @@ impl<VM: VMBinding> GCWorkScheduler<VM> {

/// Called when GC has finished, i.e. when all work packets have been executed.
/// Return `true` if it scheduled the next GC immediately.
fn on_gc_finished(&self, worker: &GCWorker<VM>, start_time: &Instant) {
fn on_gc_finished(&self, worker: &GCWorker<VM>) {
// All GC workers must have parked by now.
debug_assert!(!self.worker_group.has_designated_work());
debug_assert!(self.all_buckets_empty());
Expand All @@ -530,6 +535,10 @@ impl<VM: VMBinding> GCWorkScheduler<VM> {
mmtk.gc_trigger.policy.on_gc_end(mmtk);

// Compute the elapsed time of the GC.
let start_time = {
let mut gc_start_time = worker.mmtk.state.gc_start_time.borrow_mut();
gc_start_time.take().expect("GC not started yet?")
};
let elapsed = start_time.elapsed();

info!(
Expand Down
99 changes: 46 additions & 53 deletions src/scheduler/worker_goals.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,79 +7,72 @@
//!
//! - When in the progress of GC, the last parker will try to open buckets or announce the GC
//! has finished.
//! - When stopping for fork, every worker should exit when waken.
//! - When stopping for fork, every waken worker should save its thread state (giving in the
//! `GCWorker` struct) and exit.
//!
//! The struct `WorkerRequests` keeps a list of requests from mutators, such as requests for GC
//! and requests for forking. But the GC workers will only respond to one request at a time.
//! The struct `WorkerGoals` keeps the set of goals requested by mutators, but GC workers will only
//! respond to one request at a time, and will favor higher-priority goals.
use std::time::Instant;
use enum_map::{Enum, EnumMap};

/// This current and reqeusted goals.
#[derive(Default, Debug)]
pub(crate) struct WorkerGoals {
/// What are the workers doing now?
pub(crate) current: Option<WorkerGoal>,
/// Requests received from mutators.
pub(crate) requests: WorkerRequests,
/// The current goal.
current: Option<WorkerGoal>,
/// Requests received from mutators. `requests[goal]` is true if the `goal` is requested.
requests: EnumMap<WorkerGoal, bool>,
}

/// The thing workers are currently doing. This affects several things, such as what the last
/// parked worker will do, and whether workers will stop themselves.
#[derive(Debug)]
/// A goal, i.e. something that workers should work together to achieve.
///
/// Members of this `enum` should be listed from the highest priority to the lowest priority.
#[derive(Debug, Enum, Clone, Copy)]
pub(crate) enum WorkerGoal {
Gc {
start_time: Instant,
},
#[allow(unused)] // TODO: Implement forking support later.
/// Do a garbage collection.
Gc,
/// Stop all GC threads so that the VM can call `fork()`.
StopForFork,
}

/// Reqeusts received from mutators. Workers respond to those requests when they do not have a
/// current goal. Multiple things can be requested at the same time, and workers respond to the
/// thing with the highest priority.
///
/// The fields of this structs are ordered with decreasing priority.
#[derive(Default, Debug)]
pub(crate) struct WorkerRequests {
/// The VM needs to fork. Workers should save their contexts and exit.
pub(crate) stop_for_fork: WorkerRequest,
/// GC is requested. Workers should schedule a GC.
pub(crate) gc: WorkerRequest,
}

/// To record whether a specific goal has been reqeuested.
/// It is basically a wrapper of `bool`, but forces it to be accessed in a particular way.
#[derive(Default, Debug)] // Default: False by default.
pub(crate) struct WorkerRequest {
/// True if the goal has been requested.
requested: bool,
}

impl WorkerRequest {
/// Set the goal as requested. Return `true` if its requested state changed from `false` to
/// `true`.
pub fn set(&mut self) -> bool {
if !self.requested {
self.requested = true;
impl WorkerGoals {
/// Set the `goal` as requested. Return `true` if the requested state of the `goal` changed
/// from `false` to `true`.
pub fn set_request(&mut self, goal: WorkerGoal) -> bool {
if !self.requests[goal] {
self.requests[goal] = true;
true
} else {
false
}
}

/// Get the requested state and clear it. Return `true` if the requested state was `true`.
pub fn poll(&mut self) -> bool {
if self.requested {
self.requested = false;
true
} else {
false
/// Move the highest priority goal from the pending requests to the current request. Return
/// that goal, or `None` if no goal has been requested.
pub fn poll_next_goal(&mut self) -> Option<WorkerGoal> {
for (goal, requested) in self.requests.iter_mut() {
if *requested {
*requested = false;
self.current = Some(goal);
return Some(goal);
}
}
None
}

/// Get the current goal if exists.
pub fn current(&self) -> Option<WorkerGoal> {
self.current
}

/// Called when the current goal is completed. This will clear the current goal.
pub fn on_current_goal_completed(&mut self) {
self.current = None
}

/// Test if the request is set. For debug only. The last parked worker should use `poll` to
/// get the state and clear it.
pub fn debug_is_set(&self) -> bool {
self.requested
/// Test if the given `goal` is requested. Used for debug purpose, only. The workers always
/// respond to the request of the highest priority first.
pub fn debug_is_requested(&self, goal: WorkerGoal) -> bool {
self.requests[goal]
}
}
25 changes: 8 additions & 17 deletions src/scheduler/worker_monitor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use std::sync::{Condvar, Mutex};
use crate::vm::VMBinding;

use super::{
worker_goals::{WorkerGoal, WorkerGoals, WorkerRequests},
worker_goals::{WorkerGoal, WorkerGoals},
GCWorker,
};

Expand Down Expand Up @@ -97,21 +97,12 @@ impl WorkerMonitor {
}
}

/// Make a request. The `callback` will be called while holding the mutex `self.sync` so that
/// the caller can set some of its fields to true.
///
/// The `callback` has one parameter:
///
/// - a mutable reference to the `WorkerRequests` instance.
///
/// The `callback` should return `true` if it set any field of `WorkerRequests` to true.
pub fn make_request<Callback>(&self, callback: Callback)
where
Callback: FnOnce(&mut WorkerRequests) -> bool,
{
/// Make a request. Can be called by a mutator to request the workers to work towards the
/// given `goal`.
pub fn make_request(&self, goal: WorkerGoal) {
let mut guard = self.sync.lock().unwrap();
let set_any_fields = callback(&mut guard.goals.requests);
if set_any_fields {
let newly_requested = guard.goals.set_request(goal);
if newly_requested {
self.notify_work_available(false);
}
}
Expand Down Expand Up @@ -236,12 +227,12 @@ impl WorkerMonitor {
);

// If the current goal is `StopForFork`, return true so that the worker thread will exit.
matches!(sync.goals.current, Some(WorkerGoal::StopForFork))
matches!(sync.goals.current(), Some(WorkerGoal::StopForFork))
}

/// Called when all workers have exited.
pub fn on_all_workers_exited(&self) {
let mut sync = self.sync.try_lock().unwrap();
sync.goals.current = None;
sync.goals.on_current_goal_completed();
}
}

0 comments on commit eb1d8df

Please sign in to comment.