Skip to content
Celve edited this page Jan 21, 2024 · 2 revisions

Lock

Spin Lock

The spin lock is implemented with AtomicBool in Rust. The code is listed as follows.

#[derive(Default, Debug)]
pub struct Spin<T> {
    lock: AtomicBool,
    data: UnsafeCell<T>,
}

pub struct SpinGuard<'a, T: 'a> {
    lock: &'a AtomicBool,
    spin: &'a Spin<T>,
    data: &'a mut T,
}

impl<T> Spin<T> {
    pub const fn new(data: T) -> Self {
        Self {
            lock: AtomicBool::new(false),
            data: UnsafeCell::new(data),
        }
    }
}

unsafe impl<T: Send> Sync for Spin<T> {}
unsafe impl<T: Send> Send for Spin<T> {}

impl<T> Spin<T> {
    pub fn lock(&self) -> SpinGuard<T> {
        while self
            .lock
            .compare_exchange(false, true, Ordering::Acquire, Ordering::Acquire)
            .is_err()
        {}
        SpinGuard::new(&self, unsafe { &mut *self.data.get() }) // bypass mutability check
    }

    pub fn try_lock(&self) -> Option<SpinGuard<T>> {
        if self
            .lock
            .compare_exchange(false, true, Ordering::Acquire, Ordering::Acquire)
            .is_ok()
        {
            Some(SpinGuard::new(&self, unsafe { &mut *self.data.get() }))
        } else {
            None
        }
    }
}

impl<'a, T: 'a> SpinGuard<'a, T> {
    pub fn new(spin: &'a Spin<T>, data: &'a mut T) -> Self {
        Self {
            lock: &spin.lock,
            spin,
            data,
        }
    }

    pub fn spin(&self) -> &'a Spin<T> {
        self.spin
    }
}

Mutex

The Mutex used in rust contains a BlockLock, which is used to represent a kind lock that would suspend the thread when it fails to acquire the lock. The code is listed as follows.

pub struct BlockLock {
    lock: AtomicBool, // actually, it doesn't need atomic
    queue: Spin<WaitingQueue>,
}

impl BlockLock {
    pub fn lock(&self) {
        while self
            .lock
            .compare_exchange(false, true, Ordering::Acquire, Ordering::Acquire)
            .is_err()
        {
            {
                let task = Processor::curr_task();
                self.queue.lock().push(&task);
            }
            Processor::suspend();
        }
    }

    pub fn unlock(&self) {
        self.lock.store(false, Ordering::Release);

        let task = self.queue.lock().pop();
        if let Some(task) = task {
            task.wakeup();
        }
    }

    pub fn is_locked(&self) -> bool {
        self.lock.load(Ordering::Acquire)
    }
}

The WaitingQueue used inside is the data structure that stores the blocked thread:

    waitings: Vec<Weak<Task>>,
}

impl WaitingQueue {
    pub const fn new() -> Self {
        Self {
            waitings: Vec::new(),
        }
    }

    pub fn push(&mut self, task: &Arc<Task>) {
        self.waitings.push(task.phantom());
    }

    pub fn pop(&mut self) -> Option<Arc<Task>> {
        loop {
            let opt_task = self.waitings.pop();
            if let Some(weak_task) = opt_task {
                if let Some(arc_task) = weak_task.upgrade() {
                    break Some(arc_task);
                }
            } else {
                break None;
            }
        }
    }
}

Conditional Variable

The implementation of conditional variable is simple, which records a list of threads that are waiting on the conditional variable and a lock for it, which could be Mutex or Spin.

pub struct Condvar {
    inner: Observable,
}

impl Condvar {
    pub fn wait_mutex<'a, T>(&'a self, guard: MutexGuard<'a, T>) -> MutexGuard<T> {
        let lock = guard.mutex();
        drop(guard);
        self.inner.wait(Processor::curr_task());
        lock.lock()
    }

    pub fn wait_spin<'a, T>(&'a self, guard: SpinGuard<'a, T>) -> SpinGuard<T> {
        let lock = guard.spin();
        drop(guard);
        self.inner.wait(Processor::curr_task());
        lock.lock()
    }

    pub fn wait_mcs<'a, T>(&'a self, guard: McsGuard<'a, T>) -> McsGuard<T> {
        let lock = guard.mcs();
        drop(guard);
        self.inner.wait(Processor::curr_task());
        lock.lock()
    }

    pub fn notify_one(&self) {
        self.inner.notify_one();
    }

    pub fn notify_all(&self) {
        self.inner.notify_all();
    }
}

And two methods are supported to notify threads to wake up.

Clone this wiki locally