-
Notifications
You must be signed in to change notification settings - Fork 0
Lock
Celve edited this page Jan 21, 2024
·
2 revisions
The spin lock is implemented with AtomicBool
in Rust. The code is listed as follows.
#[derive(Default, Debug)]
pub struct Spin<T> {
lock: AtomicBool,
data: UnsafeCell<T>,
}
pub struct SpinGuard<'a, T: 'a> {
lock: &'a AtomicBool,
spin: &'a Spin<T>,
data: &'a mut T,
}
impl<T> Spin<T> {
pub const fn new(data: T) -> Self {
Self {
lock: AtomicBool::new(false),
data: UnsafeCell::new(data),
}
}
}
unsafe impl<T: Send> Sync for Spin<T> {}
unsafe impl<T: Send> Send for Spin<T> {}
impl<T> Spin<T> {
pub fn lock(&self) -> SpinGuard<T> {
while self
.lock
.compare_exchange(false, true, Ordering::Acquire, Ordering::Acquire)
.is_err()
{}
SpinGuard::new(&self, unsafe { &mut *self.data.get() }) // bypass mutability check
}
pub fn try_lock(&self) -> Option<SpinGuard<T>> {
if self
.lock
.compare_exchange(false, true, Ordering::Acquire, Ordering::Acquire)
.is_ok()
{
Some(SpinGuard::new(&self, unsafe { &mut *self.data.get() }))
} else {
None
}
}
}
impl<'a, T: 'a> SpinGuard<'a, T> {
pub fn new(spin: &'a Spin<T>, data: &'a mut T) -> Self {
Self {
lock: &spin.lock,
spin,
data,
}
}
pub fn spin(&self) -> &'a Spin<T> {
self.spin
}
}
The Mutex
used in rust contains a BlockLock
, which is used to represent a kind lock that would suspend the thread when it fails to acquire the lock. The code is listed as follows.
pub struct BlockLock {
lock: AtomicBool, // actually, it doesn't need atomic
queue: Spin<WaitingQueue>,
}
impl BlockLock {
pub fn lock(&self) {
while self
.lock
.compare_exchange(false, true, Ordering::Acquire, Ordering::Acquire)
.is_err()
{
{
let task = Processor::curr_task();
self.queue.lock().push(&task);
}
Processor::suspend();
}
}
pub fn unlock(&self) {
self.lock.store(false, Ordering::Release);
let task = self.queue.lock().pop();
if let Some(task) = task {
task.wakeup();
}
}
pub fn is_locked(&self) -> bool {
self.lock.load(Ordering::Acquire)
}
}
The WaitingQueue
used inside is the data structure that stores the blocked thread:
waitings: Vec<Weak<Task>>,
}
impl WaitingQueue {
pub const fn new() -> Self {
Self {
waitings: Vec::new(),
}
}
pub fn push(&mut self, task: &Arc<Task>) {
self.waitings.push(task.phantom());
}
pub fn pop(&mut self) -> Option<Arc<Task>> {
loop {
let opt_task = self.waitings.pop();
if let Some(weak_task) = opt_task {
if let Some(arc_task) = weak_task.upgrade() {
break Some(arc_task);
}
} else {
break None;
}
}
}
}
The implementation of conditional variable is simple, which records a list of threads that are waiting on the conditional variable and a lock for it, which could be Mutex
or Spin
.
pub struct Condvar {
inner: Observable,
}
impl Condvar {
pub fn wait_mutex<'a, T>(&'a self, guard: MutexGuard<'a, T>) -> MutexGuard<T> {
let lock = guard.mutex();
drop(guard);
self.inner.wait(Processor::curr_task());
lock.lock()
}
pub fn wait_spin<'a, T>(&'a self, guard: SpinGuard<'a, T>) -> SpinGuard<T> {
let lock = guard.spin();
drop(guard);
self.inner.wait(Processor::curr_task());
lock.lock()
}
pub fn wait_mcs<'a, T>(&'a self, guard: McsGuard<'a, T>) -> McsGuard<T> {
let lock = guard.mcs();
drop(guard);
self.inner.wait(Processor::curr_task());
lock.lock()
}
pub fn notify_one(&self) {
self.inner.notify_one();
}
pub fn notify_all(&self) {
self.inner.notify_all();
}
}
And two methods are supported to notify threads to wake up.