Skip to content

Commit

Permalink
Priority queue + thread locals (#19)
Browse files Browse the repository at this point in the history
* priority queue work

* working priority queue

* fnv

* lint

* more lint

* timer removal fix

* retained

* TimerScheduling

* remove timetobeexecuted

* thread_local baby

* v3
  • Loading branch information
bmisiak authored Feb 21, 2024
1 parent 2a764bc commit 7244f7c
Show file tree
Hide file tree
Showing 4 changed files with 194 additions and 89 deletions.
45 changes: 43 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 4 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@ samp = { git = "https://github.com/ZOTTCE/samp-rs.git", rev = "2e9192713bf9a77ee
slab = "0.4.2"
log = "0.4.6"
fern = "0.5.9"
priority-queue = "1.3.0"
once_cell = "1.19.0"
fnv = "1.0.7"

[profile.release]
lto = true
lto = true
178 changes: 145 additions & 33 deletions src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,21 +1,51 @@
#![warn(clippy::pedantic)]
use amx_arguments::VariadicAmxArguments;
use log::info;
use log::{error, info};
use priority_queue::PriorityQueue;
use samp::amx::{Amx, AmxIdent};
use samp::cell::AmxString;
use samp::error::{AmxError, AmxResult};
use samp::plugin::SampPlugin;
use slab::Slab;
use std::cell::RefCell;
use std::cmp::Reverse;
use std::convert::TryFrom;
use std::time::{Duration, Instant};
use timer::{Timer, TimerStaus};

use timer::Timer;
mod amx_arguments;
mod timer;

/// The plugin and its data: a list of scheduled timers
struct PreciseTimers {
timers: Slab<Timer>,
thread_local! {
static TIMERS: RefCell<Slab<Timer>> = RefCell::new(Slab::with_capacity(1000));
static QUEUE: RefCell<PriorityQueue<usize, Reverse<TimerScheduling>, fnv::FnvBuildHasher>> = RefCell::new(PriorityQueue::with_capacity_and_default_hasher(1000));
}

/// The plugin
struct PreciseTimers;

#[derive(Clone)]
struct TimerScheduling {
interval: Option<Duration>,
execution_forbidden: bool,
next_trigger: Instant,
}

impl PartialEq for TimerScheduling {
fn eq(&self, other: &Self) -> bool {
self.next_trigger == other.next_trigger
}
}
impl Eq for TimerScheduling {}

impl PartialOrd for TimerScheduling {
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
self.next_trigger.partial_cmp(&other.next_trigger)
}
}
impl Ord for TimerScheduling {
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
self.next_trigger.cmp(&other.next_trigger)
}
}

impl PreciseTimers {
Expand All @@ -38,18 +68,26 @@ impl PreciseTimers {

// Find the callback by name and save its index
let amx_callback_index = amx.find_public(&callback_name.to_string())?;
let next_trigger = Instant::now() + interval;

let timer = Timer {
next_trigger: Instant::now() + interval,
interval: if repeat { Some(interval) } else { None },
passed_arguments,
amx_identifier: AmxIdent::from(amx.amx().as_ptr()),
amx_callback_index,
scheduled_for_removal: false,
};

// Add the timer to the list. This is safe for Slab::retain() even if SetPreciseTimer was called from a timer's callback.
let key: usize = self.timers.insert(timer);
let key: usize = TIMERS.with_borrow_mut(|t| t.insert(timer));
QUEUE.with_borrow_mut(|q| {
q.push(
key,
Reverse(TimerScheduling {
next_trigger,
interval: if repeat { Some(interval) } else { None },
execution_forbidden: false,
}),
)
});
// The timer's slot in Slab<> incresed by 1, so that 0 signifies an invalid timer in PAWN
let timer_number = key
.checked_add(1)
Expand All @@ -66,10 +104,15 @@ impl PreciseTimers {
#[allow(clippy::unnecessary_wraps)]
#[samp::native(name = "DeletePreciseTimer")]
pub fn delete(&mut self, _: &Amx, timer_number: usize) -> AmxResult<i32> {
// Subtract 1 from the passed timer_number (where 0=invalid) to get the actual Slab<> slot
if let Some(timer) = self.timers.get_mut(timer_number - 1) {
// We defer the removal so that we don't mess up the process_tick()->retain() iterator.
timer.scheduled_for_removal = true;
let key = timer_number - 1;
if QUEUE
.try_with(|q| {
q.borrow_mut().change_priority_by(&key, |scheduling| {
scheduling.0.execution_forbidden = true;
})
})
.map_err(|_| AmxError::MemoryAccess)?
{
Ok(1)
} else {
Ok(0)
Expand All @@ -89,13 +132,25 @@ impl PreciseTimers {
interval: i32,
repeat: bool,
) -> AmxResult<i32> {
if let Some(timer) = self.timers.get_mut(timer_number - 1) {
let interval = u64::try_from(interval)
.map(Duration::from_millis)
.or(Err(AmxError::Params))?;
let key = timer_number - 1;
let interval = u64::try_from(interval)
.map(Duration::from_millis)
.or(Err(AmxError::Params))?;

timer.next_trigger = Instant::now() + interval;
timer.interval = if repeat { Some(interval) } else { None };
if QUEUE
.try_with(|q| {
q.borrow_mut().change_priority(
&key,
Reverse(TimerScheduling {
next_trigger: Instant::now() + interval,
interval: if repeat { Some(interval) } else { None },
execution_forbidden: false,
}),
)
})
.map_err(|_| AmxError::MemoryAccess)?
.is_some()
{
Ok(1)
} else {
Ok(0)
Expand All @@ -105,7 +160,7 @@ impl PreciseTimers {

impl SampPlugin for PreciseTimers {
fn on_load(&mut self) {
info!("net4game.com/samp-precise-timers by Brian Misiak loaded correctly.");
info!("samp-precise-timers 3 by Brian Misiak loaded correctly.");
}

#[allow(clippy::inline_always)]
Expand All @@ -114,18 +169,77 @@ impl SampPlugin for PreciseTimers {
// Rust's Instant is monotonic and nondecreasing, even during NTP time adjustment.
let now = Instant::now();

// 💀⚠ Because of FFI with C, Rust can't notice the simultaneous mutation of self.timers, but the iterator could get messed up in case of
// Slab::retain() -> Timer::trigger() -> PAWN callback/ffi which calls DeletePreciseTimer() -> Slab::remove.
// That's why the DeletePreciseTimer() schedules timers for deletion instead of doing it right away.
// Slab::retain() is, however, okay with inserting new timers during its execution, even in case of reallocation when over capacity.
self.timers.retain(|_key: usize, timer| {
timer.trigger_if_due(now) == TimerStaus::MightTriggerInTheFuture
});
loop {
let triggered_timer = QUEUE.with_borrow(|q| match q.peek() {
Some((
&key,
&Reverse(TimerScheduling {
next_trigger,
interval,
execution_forbidden,
}),
)) if next_trigger <= now => Some((key, interval, execution_forbidden)),
_ => None,
});
let Some((key, interval, execution_forbidden)) = triggered_timer else {
break;
};

if let (Some(interval), false) = (interval, execution_forbidden) {
let next_trigger = now + interval;
QUEUE.with_borrow_mut(|q| {
q.change_priority(
&key,
Reverse(TimerScheduling {
next_trigger,
execution_forbidden,
interval: Some(interval),
}),
)
.expect("failed to reschedule repeating timer");
});

TIMERS.with_borrow_mut(|t| {
let timer = t.get_mut(key).expect("slab should contain repeating timer");

if let Err(err) = timer.execute_pawn_callback() {
error!("Error executing repeating timer callback: {}", err);
}
});
} else {
// Must pop before the timer is executed, so that
// it can't schedule anything as the very next timer before
// we have a chance to pop from the queue.
let (popped_key, _) = QUEUE
.with_borrow_mut(|q| q.pop())
.expect("priority queue should have at least the timer we peeked");
assert_eq!(popped_key, key);
let mut removed_timer = TIMERS.with_borrow_mut(|t| t.remove(key));

if !execution_forbidden {
if let Err(err) = removed_timer.execute_pawn_callback() {
error!("Error executing non-repeating timer callback: {}", err);
}
}
}
}
}

fn on_amx_unload(&mut self, unloaded_amx: &Amx) {
self.timers
.retain(|_, timer| !timer.was_scheduled_by_amx(unloaded_amx));
let mut removed_timers = Vec::new();
TIMERS.with_borrow_mut(|t| {
t.retain(|key, timer| {
if timer.was_scheduled_by_amx(unloaded_amx) {
removed_timers.push(key);
false
} else {
true
}
})
});
for key in removed_timers {
QUEUE.with_borrow_mut(|q| q.remove(&key));
}
}
}

Expand All @@ -148,8 +262,6 @@ samp::initialize_plugin!(
.chain(samp_logger)
.apply();

PreciseTimers {
timers: Slab::with_capacity(1000)
}
PreciseTimers
}
);
Loading

0 comments on commit 7244f7c

Please sign in to comment.