Skip to content

Commit

Permalink
Add documentation and reorganize files
Browse files Browse the repository at this point in the history
  • Loading branch information
jedel1043 committed Oct 18, 2023
1 parent f94c682 commit 179b6eb
Show file tree
Hide file tree
Showing 6 changed files with 456 additions and 269 deletions.
293 changes: 291 additions & 2 deletions boa_engine/src/builtins/atomics/futex.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,161 @@
// Implementation mostly based from https://github.com/v8/v8/blob/main/src/execution/futex-emulation.cc
// TODO: track https://github.com/rust-lang/rfcs/pull/3467 to see if we can use `UnsafeAliased` instead
// of raw pointers.

// A bit of context about how exactly this thing works.
//
// `Atomics.wait/notify` is basically an emulation of the "futex" syscall, which internally uses
// a wait queue attached to a certain memory address, where processes and threads can manipulate
// it to synchronize between them.
// More information: https://en.wikipedia.org/wiki/Futex
//
// Our emulation of the API is composed by three components:
//
// - `FutexWaiters`, which is a map of addresses to the corresponding wait queue for that address.
// Internally uses intrusive linked lists to avoid allocating when adding a new waiter, which
// reduces the time spent by a thread in the critical section.
//
// - `FutexWaiter`, which contains all the data necessary to be able to wake a waiter from another
// thread. It also contains a `waiting` boolean, that is checked after waking up to see
// if the waiter was indeed woken up or if it just sporadically woke up (yes, this is a thing that
// can happen per the documentation of `CondVar`).
//
// - `CRITICAL_SECTION`, a global static that must be locked before registering or notifying any
// waiter. This guarantees that only one agent can write to the wait queues at any point in time.
//
// We can emulate a typical execution using the API for demonstration purposes.
// At the start of the program, we initially have an empty map of wait queues. We represent this
// graphically as:
//
// Address │
// │
// ────────────┼────────────────────────────────────────────────────────────────────
// │
// │
// <empty> │
// │
// │
//
// Each row here will represent an address and the corresponding wait queue for that address.
//
// Let's suppose that "Thread 2" wants to wait on the address 50. After locking the global mutex,
// it first creates a new instante of a `FutexWaiter` and passes a pointer to it to the
// `FutexWaiters::add_waiter`:
//
// Address │
// │
// ────────────┼──────────────────────────────────────────────────────────────────────
// │
// │ ┌───────────────┐
// │ ┌─►│ │
// │ │ │ Thread 2 │
// │ │ │ FutexWaiter │
// 50 ├────┘ │ │
// │ │ │
// │ │ cond_var │
// │ │ waiting: true │
// │ │ │
// │ └───────────────┘
// │
//
// Immediately after this, "Thread 2" calls `cond_var.wait`, unlock the global mutex and sleeps
// until it is notified again (ignoring the spurious wakeups, those are handled in an infinite loop
// anyways).
//
// Now let's suppose that `Thread 1` has now acquired the lock and now wants to also
// wait on the address `50`. Doing the same procedure as "Thread 2", our map now looks like:
//
// Address │
// │
// ────────────┼──────────────────────────────────────────────────────────────────────
// │
// │ ┌───────────────┐ ┌───────────────┐
// │ ┌─►│ ├───────►│ │
// │ │ │ Thread 2 │ │ Thread 1 │
// │ │ │ FutexWaiter │ │ FutexWaiter │
// 50 ├────┘ │ │ │ │
// │ │ │ │ │
// │ │ cond_var │ │ cond_var │
// │ │ waiting: true │◄───────┤ waiting: true │
// │ │ │ │ │
// │ └───────────────┘ └───────────────┘
// │
//
// Note how the head of our list contains the first waiter which was registered, and the
// tail of our list is our most recent waiter.
//
// Finally, after "Thread 1" sleeps, "Thread 3" has the opportunity to lock the global mutex.
// In this case, "Thread 3" will notify one waiter of the address 50 using the `cond_var` inside
// `FutexWaiter`, and will also remove it from the linked list. In this case
// the notified thread is "Thread 2":
//
// Address │
// │
// ────────────┼──────────────────────────────────────────────────────────────────────
// │
// │ ┌────────────────┐ ┌────────────────┐
// │ │ │ ┌──►│ │
// │ │ Thread 2 │ │ │ Thread 1 │
// │ │ FutexWaiter │ │ │ FutexWaiter │
// 50 ├───┐ │ │ │ │ │
// │ │ │ │ │ │ │
// │ │ │ cond_var │ │ │ cond_var │
// │ │ │ waiting: false │ │ │ waiting: true │
// │ │ │ │ │ │ │
// │ │ └────────────────┘ │ └────────────────┘
// │ │ │
// │ └────────────────────────┘
// │
//
// Then, when the lock is released and "Thread 2" has woken up, it tries to lock the global mutex
// again, checking if `waiting` is true to manually remove itself from the queue if that's the case.
// In this case, `waiting` is false, which doesn't require any other handling, so it just
// removes the `FutexWaiter` from its stack and returns `AtomicsWaitResult::Ok`.
//
// Address │
// │
// ────────────┼──────────────────────────────────────────────────────────────────────
// │
// │ ┌────────────────┐
// │ ┌──────────────────────────►│ │
// │ │ │ Thread 1 │
// │ │ │ FutexWaiter │
// 50 ├────┘ │ │
// │ │ │
// │ │ cond_var │
// │ │ waiting: true │
// │ │ │
// │ └────────────────┘
// │
// │
// │
//
// In a future point in time, "Thread 1" will be notified, which will proceed with the
// exact same steps as "Thread 2", emptying the wait queue and finishing the execution of our
// program.

#![deny(unsafe_op_in_unsafe_fn)]
#![deny(clippy::undocumented_unsafe_blocks)]
#![allow(clippy::expl_impl_clone_on_copy)]
#![allow(unstable_name_collisions)]

use std::sync::{Condvar, Mutex};
use std::{
cell::UnsafeCell,
sync::{atomic::Ordering, Condvar, Mutex},
};

use intrusive_collections::{intrusive_adapter, LinkedList, LinkedListLink, UnsafeRef};
use sptr::Strict;

use crate::small_map::{Entry, SmallMap};
use crate::{
builtins::{
array_buffer::{utils::SliceRef, SharedArrayBuffer},
typed_array::Element,
},
small_map::{Entry, SmallMap},
sys::time::{Duration, Instant},
JsNativeError, JsResult,
};

/// Map of shared data addresses and its corresponding list of agents waiting on that location.
pub(crate) static CRITICAL_SECTION: Mutex<FutexWaiters> = Mutex::new(FutexWaiters {
Expand Down Expand Up @@ -110,3 +256,146 @@ impl FutexWaiters {
}
}
}

#[derive(Debug, Clone, Copy)]
pub(super) enum AtomicsWaitResult {
NotEqual,
TimedOut,
Ok,
}

/// Adds this agent to the wait queue for the address pointed to by `buffer[offset..]`.
///
/// # Safety
///
/// - `addr` must be a multiple of `std::mem::size_of::<E>()`.
/// - `buffer` must contain at least `std::mem::size_of::<E>()` bytes to read starting from `usize`.
// our implementation guarantees that `SharedArrayBuffer` is always aligned to `u64` at minimum.
pub(super) unsafe fn wait<E: Element + PartialEq>(
buffer: &SharedArrayBuffer,
offset: usize,
check: E,
timeout: Option<Duration>,
) -> JsResult<AtomicsWaitResult> {
// 10. Let block be buffer.[[ArrayBufferData]].
// 11. Let WL be GetWaiterList(block, indexedPosition).
// 12. Perform EnterCriticalSection(WL).
let mut waiters = CRITICAL_SECTION.lock().map_err(|_| {
// avoids exposing internals of our implementation.
JsNativeError::typ().with_message("failed to synchronize with the agent cluster")
})?;

let time_info = timeout.map(|timeout| (Instant::now(), timeout));

let buffer = &buffer.data()[offset..];

// 13. Let elementType be TypedArrayElementType(typedArray).
// 14. Let w be GetValueFromBuffer(buffer, indexedPosition, elementType, true, SeqCst).

// SAFETY: The safety of this operation is guaranteed by the caller.
let value = unsafe { E::read(SliceRef::AtomicSlice(buffer)).load(Ordering::SeqCst) };

// 15. If v ≠ w, then
// a. Perform LeaveCriticalSection(WL).
// b. Return "not-equal".
if check != value {
return Ok(AtomicsWaitResult::NotEqual);
}

// 16. Let W be AgentSignifier().
// 17. Perform AddWaiter(WL, W).

// ensure we can have aliased pointers to the waiter in a sound way.
let waiter = UnsafeCell::new(FutexWaiter::default());
let waiter_ptr = waiter.get();

// SAFETY: waiter is valid and we call `remove_node` below.
unsafe {
waiters.add_waiter(waiter_ptr, buffer.as_ptr().addr());
}

// 18. Let notified be SuspendAgent(WL, W, t).

// `SuspendAgent(WL, W, t)`
// https://tc39.es/ecma262/#sec-suspendthisagent

let result = loop {
// SAFETY: waiter is still valid
if unsafe { !(*waiter_ptr).waiting } {
break AtomicsWaitResult::Ok;
}

if let Some((start, timeout)) = time_info {
let Some(remaining) = timeout.checked_sub(start.elapsed()) else {
break AtomicsWaitResult::TimedOut;
};

// Since the mutex is poisoned, `waiter` cannot be read from other threads, meaning
// we can return directly.
// This doesn't use `wait_timeout_while` because it has to mutably borrow `waiter`,
// which is a big nono since we have pointers to that location while the borrow is
// active.
// SAFETY: waiter is still valid
waiters = unsafe {
(*waiter_ptr)
.cond_var
.wait_timeout(waiters, remaining)
.map_err(|_| {
JsNativeError::typ()
.with_message("failed to synchronize with the agent cluster")
})?
.0
};
} else {
// SAFETY: waiter is still valid
waiters = unsafe {
(*waiter_ptr).cond_var.wait(waiters).map_err(|_| {
JsNativeError::typ()
.with_message("failed to synchronize with the agent cluster")
})?
};
}
};

// SAFETY: waiter is valid and contained in its waiter list if `waiting == true`.
unsafe {
// 20. Else,
// a. Perform RemoveWaiter(WL, W).
if (*waiter_ptr).waiting {
waiters.remove_waiter(waiter_ptr);
} else {
// 19. If notified is true, then
// a. Assert: W is not on the list of waiters in WL.
debug_assert!(!(*waiter_ptr).link.is_linked());
}
}

// 21. Perform LeaveCriticalSection(WL).
drop(waiters);

// 22. If notified is true, return "ok".
// 23. Return "timed-out".
Ok(result)
}

/// Notifies at most `count` agents waiting on the memory address pointed to by `buffer[offset..]`.
pub(super) fn notify(buffer: &SharedArrayBuffer, offset: usize, count: u64) -> JsResult<u64> {
let addr = buffer.data()[offset..].as_ptr().addr();

// 7. Let WL be GetWaiterList(block, indexedPosition).
// 8. Perform EnterCriticalSection(WL).
let mut waiters = CRITICAL_SECTION.lock().map_err(|_| {
// avoids exposing internals of our implementation.
JsNativeError::typ().with_message("failed to synchronize with the agent cluster")
})?;

// 9. Let S be RemoveWaiters(WL, c).
// 10. For each element W of S, do
// a. Perform NotifyWaiter(WL, W).
let count = waiters.notify_many(addr, count);

// 11. Perform LeaveCriticalSection(WL).
drop(waiters);

Ok(count)
}
Loading

0 comments on commit 179b6eb

Please sign in to comment.