Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Priority queue + thread locals #19

Merged
merged 11 commits into from
Feb 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 43 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 4 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@ samp = { git = "https://github.com/ZOTTCE/samp-rs.git", rev = "2e9192713bf9a77ee
slab = "0.4.2"
log = "0.4.6"
fern = "0.5.9"
priority-queue = "1.3.0"
once_cell = "1.19.0"
fnv = "1.0.7"

[profile.release]
lto = true
lto = true
178 changes: 145 additions & 33 deletions src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,21 +1,51 @@
#![warn(clippy::pedantic)]
use amx_arguments::VariadicAmxArguments;
use log::info;
use log::{error, info};
use priority_queue::PriorityQueue;
use samp::amx::{Amx, AmxIdent};
use samp::cell::AmxString;
use samp::error::{AmxError, AmxResult};
use samp::plugin::SampPlugin;
use slab::Slab;
use std::cell::RefCell;
use std::cmp::Reverse;
use std::convert::TryFrom;
use std::time::{Duration, Instant};
use timer::{Timer, TimerStaus};

use timer::Timer;
mod amx_arguments;
mod timer;

/// The plugin and its data: a list of scheduled timers
struct PreciseTimers {
timers: Slab<Timer>,
thread_local! {
static TIMERS: RefCell<Slab<Timer>> = RefCell::new(Slab::with_capacity(1000));
static QUEUE: RefCell<PriorityQueue<usize, Reverse<TimerScheduling>, fnv::FnvBuildHasher>> = RefCell::new(PriorityQueue::with_capacity_and_default_hasher(1000));
}

/// The plugin
struct PreciseTimers;

#[derive(Clone)]
struct TimerScheduling {
interval: Option<Duration>,
execution_forbidden: bool,
next_trigger: Instant,
}

impl PartialEq for TimerScheduling {
fn eq(&self, other: &Self) -> bool {
self.next_trigger == other.next_trigger
}
}
impl Eq for TimerScheduling {}

impl PartialOrd for TimerScheduling {
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
self.next_trigger.partial_cmp(&other.next_trigger)
}
}
impl Ord for TimerScheduling {
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
self.next_trigger.cmp(&other.next_trigger)
}
}

impl PreciseTimers {
Expand All @@ -38,18 +68,26 @@ impl PreciseTimers {

// Find the callback by name and save its index
let amx_callback_index = amx.find_public(&callback_name.to_string())?;
let next_trigger = Instant::now() + interval;

let timer = Timer {
next_trigger: Instant::now() + interval,
interval: if repeat { Some(interval) } else { None },
passed_arguments,
amx_identifier: AmxIdent::from(amx.amx().as_ptr()),
amx_callback_index,
scheduled_for_removal: false,
};

// Add the timer to the list. This is safe for Slab::retain() even if SetPreciseTimer was called from a timer's callback.
let key: usize = self.timers.insert(timer);
let key: usize = TIMERS.with_borrow_mut(|t| t.insert(timer));
QUEUE.with_borrow_mut(|q| {
q.push(
key,
Reverse(TimerScheduling {
next_trigger,
interval: if repeat { Some(interval) } else { None },
execution_forbidden: false,
}),
)
});
// The timer's slot in Slab<> incresed by 1, so that 0 signifies an invalid timer in PAWN
let timer_number = key
.checked_add(1)
Expand All @@ -66,10 +104,15 @@ impl PreciseTimers {
#[allow(clippy::unnecessary_wraps)]
#[samp::native(name = "DeletePreciseTimer")]
pub fn delete(&mut self, _: &Amx, timer_number: usize) -> AmxResult<i32> {
// Subtract 1 from the passed timer_number (where 0=invalid) to get the actual Slab<> slot
if let Some(timer) = self.timers.get_mut(timer_number - 1) {
// We defer the removal so that we don't mess up the process_tick()->retain() iterator.
timer.scheduled_for_removal = true;
let key = timer_number - 1;
if QUEUE
.try_with(|q| {
q.borrow_mut().change_priority_by(&key, |scheduling| {
scheduling.0.execution_forbidden = true;
})
})
.map_err(|_| AmxError::MemoryAccess)?
{
Ok(1)
} else {
Ok(0)
Expand All @@ -89,13 +132,25 @@ impl PreciseTimers {
interval: i32,
repeat: bool,
) -> AmxResult<i32> {
if let Some(timer) = self.timers.get_mut(timer_number - 1) {
let interval = u64::try_from(interval)
.map(Duration::from_millis)
.or(Err(AmxError::Params))?;
let key = timer_number - 1;
let interval = u64::try_from(interval)
.map(Duration::from_millis)
.or(Err(AmxError::Params))?;

timer.next_trigger = Instant::now() + interval;
timer.interval = if repeat { Some(interval) } else { None };
if QUEUE
.try_with(|q| {
q.borrow_mut().change_priority(
&key,
Reverse(TimerScheduling {
next_trigger: Instant::now() + interval,
interval: if repeat { Some(interval) } else { None },
execution_forbidden: false,
}),
)
})
.map_err(|_| AmxError::MemoryAccess)?
.is_some()
{
Ok(1)
} else {
Ok(0)
Expand All @@ -105,7 +160,7 @@ impl PreciseTimers {

impl SampPlugin for PreciseTimers {
fn on_load(&mut self) {
info!("net4game.com/samp-precise-timers by Brian Misiak loaded correctly.");
info!("samp-precise-timers 3 by Brian Misiak loaded correctly.");
}

#[allow(clippy::inline_always)]
Expand All @@ -114,18 +169,77 @@ impl SampPlugin for PreciseTimers {
// Rust's Instant is monotonic and nondecreasing, even during NTP time adjustment.
let now = Instant::now();

// 💀⚠ Because of FFI with C, Rust can't notice the simultaneous mutation of self.timers, but the iterator could get messed up in case of
// Slab::retain() -> Timer::trigger() -> PAWN callback/ffi which calls DeletePreciseTimer() -> Slab::remove.
// That's why the DeletePreciseTimer() schedules timers for deletion instead of doing it right away.
// Slab::retain() is, however, okay with inserting new timers during its execution, even in case of reallocation when over capacity.
self.timers.retain(|_key: usize, timer| {
timer.trigger_if_due(now) == TimerStaus::MightTriggerInTheFuture
});
loop {
let triggered_timer = QUEUE.with_borrow(|q| match q.peek() {
Some((
&key,
&Reverse(TimerScheduling {
next_trigger,
interval,
execution_forbidden,
}),
)) if next_trigger <= now => Some((key, interval, execution_forbidden)),
_ => None,
});
let Some((key, interval, execution_forbidden)) = triggered_timer else {
break;
};

if let (Some(interval), false) = (interval, execution_forbidden) {
let next_trigger = now + interval;
QUEUE.with_borrow_mut(|q| {
q.change_priority(
&key,
Reverse(TimerScheduling {
next_trigger,
execution_forbidden,
interval: Some(interval),
}),
)
.expect("failed to reschedule repeating timer");
});

TIMERS.with_borrow_mut(|t| {
let timer = t.get_mut(key).expect("slab should contain repeating timer");

if let Err(err) = timer.execute_pawn_callback() {
error!("Error executing repeating timer callback: {}", err);
}
});
} else {
// Must pop before the timer is executed, so that
// it can't schedule anything as the very next timer before
// we have a chance to pop from the queue.
let (popped_key, _) = QUEUE
.with_borrow_mut(|q| q.pop())
.expect("priority queue should have at least the timer we peeked");
assert_eq!(popped_key, key);
let mut removed_timer = TIMERS.with_borrow_mut(|t| t.remove(key));

if !execution_forbidden {
if let Err(err) = removed_timer.execute_pawn_callback() {
error!("Error executing non-repeating timer callback: {}", err);
}
}
}
}
}

fn on_amx_unload(&mut self, unloaded_amx: &Amx) {
self.timers
.retain(|_, timer| !timer.was_scheduled_by_amx(unloaded_amx));
let mut removed_timers = Vec::new();
TIMERS.with_borrow_mut(|t| {
t.retain(|key, timer| {
if timer.was_scheduled_by_amx(unloaded_amx) {
removed_timers.push(key);
false
} else {
true
}
})
});
for key in removed_timers {
QUEUE.with_borrow_mut(|q| q.remove(&key));
}
}
}

Expand All @@ -148,8 +262,6 @@ samp::initialize_plugin!(
.chain(samp_logger)
.apply();

PreciseTimers {
timers: Slab::with_capacity(1000)
}
PreciseTimers
}
);
Loading
Loading