diff --git a/benches/main.rs b/benches/main.rs index 9761f87..6f6bc2c 100644 --- a/benches/main.rs +++ b/benches/main.rs @@ -29,7 +29,7 @@ fn run_basic_subscriber(c: &mut Criterion) { OwnedFd::from_raw_fd(raw_fd) }; - event_manager.add(event_fd.as_fd(),EventSet::IN | EventSet::ERROR | EventSet::HANG_UP,Box::new(move |_:&mut EventManager, event_set: EventSet| { + event_manager.add(event_fd.as_fd(), EventSet::IN | EventSet::ERROR | EventSet::HANG_UP, Box::new(move |_:&mut EventManager<()>, event_set: EventSet| { match event_set { EventSet::IN => (), EventSet::ERROR => { @@ -47,9 +47,10 @@ fn run_basic_subscriber(c: &mut Criterion) { event_fd }).collect::>(); + let expected = vec![(); usize::try_from(no_of_subscribers).unwrap()]; c.bench_function("process_basic", |b| { b.iter(|| { - assert_eq!(event_manager.wait(Some(0)), Ok(no_of_subscribers)); + assert_eq!(event_manager.wait(Some(0)), Ok(expected.as_slice())); }) }); @@ -75,7 +76,7 @@ fn run_arc_mutex_subscriber(c: &mut Criterion) { let counter = Arc::new(Mutex::new(0u64)); let counter_clone = counter.clone(); - event_manager.add(event_fd.as_fd(),EventSet::IN | EventSet::ERROR | EventSet::HANG_UP,Box::new(move |_:&mut EventManager, event_set: EventSet| { + event_manager.add(event_fd.as_fd(),EventSet::IN | EventSet::ERROR | EventSet::HANG_UP,Box::new(move |_:&mut EventManager<()>, event_set: EventSet| { match event_set { EventSet::IN => { *counter_clone.lock().unwrap() += 1; @@ -95,9 +96,10 @@ fn run_arc_mutex_subscriber(c: &mut Criterion) { (event_fd,counter) }).collect::>(); + let expected = vec![(); usize::try_from(no_of_subscribers).unwrap()]; c.bench_function("process_with_arc_mutex", |b| { b.iter(|| { - assert_eq!(event_manager.wait(Some(0)), Ok(no_of_subscribers)); + assert_eq!(event_manager.wait(Some(0)), Ok(expected.as_slice())); }) }); @@ -124,7 +126,7 @@ fn run_subscriber_with_inner_mut(c: &mut Criterion) { let counter = Arc::new(AtomicU64::new(0)); let counter_clone = counter.clone(); - event_manager.add(event_fd.as_fd(),EventSet::IN | EventSet::ERROR | EventSet::HANG_UP,Box::new(move |_:&mut EventManager, event_set: EventSet| { + event_manager.add(event_fd.as_fd(),EventSet::IN | EventSet::ERROR | EventSet::HANG_UP,Box::new(move |_:&mut EventManager<()>, event_set: EventSet| { match event_set { EventSet::IN => { counter_clone.fetch_add(1, Ordering::SeqCst); @@ -144,9 +146,10 @@ fn run_subscriber_with_inner_mut(c: &mut Criterion) { (event_fd,counter) }).collect::>(); + let expected = vec![(); usize::try_from(no_of_subscribers).unwrap()]; c.bench_function("process_with_inner_mut", |b| { b.iter(|| { - assert_eq!(event_manager.wait(Some(0)), Ok(no_of_subscribers)); + assert_eq!(event_manager.wait(Some(0)), Ok(expected.as_slice())); }) }); @@ -177,7 +180,7 @@ fn run_multiple_subscriber_types(c: &mut Criterion) { let counter = Arc::new(AtomicU64::new(0)); let counter_clone = counter.clone(); - event_manager.add(event_fd.as_fd(),EventSet::IN | EventSet::ERROR | EventSet::HANG_UP,Box::new(move |_:&mut EventManager, event_set: EventSet| { + event_manager.add(event_fd.as_fd(),EventSet::IN | EventSet::ERROR | EventSet::HANG_UP,Box::new(move |_:&mut EventManager<()>, event_set: EventSet| { match event_set { EventSet::IN => { counter_clone.fetch_add(1, Ordering::SeqCst); @@ -222,8 +225,8 @@ fn run_multiple_subscriber_types(c: &mut Criterion) { .add( inner_subscribers[i].as_fd(), EventSet::IN | EventSet::ERROR | EventSet::HANG_UP, - Box::new( - move |_: &mut EventManager, event_set: EventSet| match event_set { + Box::new(move |_: &mut EventManager<()>, event_set: EventSet| { + match event_set { EventSet::IN => { data_clone[i].fetch_add(1, Ordering::SeqCst); } @@ -234,8 +237,8 @@ fn run_multiple_subscriber_types(c: &mut Criterion) { panic!("Cannot continue execution. Associated fd was closed."); } _ => {} - }, - ), + } + }), ) .unwrap(); } @@ -244,9 +247,10 @@ fn run_multiple_subscriber_types(c: &mut Criterion) { }) .collect::>(); + let expected = vec![(); usize::try_from(total).unwrap()]; c.bench_function("process_dynamic_dispatch", |b| { b.iter(|| { - assert_eq!(event_manager.wait(Some(0)), Ok(total)); + assert_eq!(event_manager.wait(Some(0)), Ok(expected.as_slice())); }) }); @@ -272,7 +276,7 @@ fn run_with_few_active_events(c: &mut Criterion) { OwnedFd::from_raw_fd(raw_fd) }; - event_manager.add(event_fd.as_fd(),EventSet::IN | EventSet::ERROR | EventSet::HANG_UP,Box::new(move |_:&mut EventManager, event_set: EventSet| { + event_manager.add(event_fd.as_fd(),EventSet::IN | EventSet::ERROR | EventSet::HANG_UP,Box::new(move |_:&mut EventManager<()>, event_set: EventSet| { match event_set { EventSet::IN => (), EventSet::ERROR => { @@ -290,9 +294,10 @@ fn run_with_few_active_events(c: &mut Criterion) { event_fd }).collect::>(); + let expected = vec![(); usize::try_from(active).unwrap()]; c.bench_function("process_dispatch_few_events", |b| { b.iter(|| { - assert_eq!(event_manager.wait(Some(0)), Ok(active)); + assert_eq!(event_manager.wait(Some(0)), Ok(expected.as_slice())); }) }); diff --git a/coverage_config_x86_64.json b/coverage_config_x86_64.json index 33b8aa1..6ccc075 100644 --- a/coverage_config_x86_64.json +++ b/coverage_config_x86_64.json @@ -1,5 +1,5 @@ { - "coverage_score": 81.4, + "coverage_score": 86.9, "exclude_path": "benches/", "crate_features": "" } diff --git a/src/lib.rs b/src/lib.rs index 26334ad..33df8c1 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -6,7 +6,7 @@ use std::os::fd::{AsFd, AsRawFd, BorrowedFd, FromRawFd, OwnedFd, RawFd}; use vmm_sys_util::epoll::EventSet; /// The function thats runs when an event occurs. -type Action = Box; +type Action = Box, EventSet) -> T>; fn errno() -> i32 { // SAFETY: Always safe. @@ -14,13 +14,15 @@ fn errno() -> i32 { } #[derive(Debug)] -pub struct BufferedEventManager { - event_manager: EventManager, +pub struct BufferedEventManager { + event_manager: EventManager, // TODO The length is always unused, a custom type could thus save `size_of::()` bytes. buffer: Vec, + // TODO The length is always unused, a custom type could thus save `size_of::()` bytes. + output_buffer: Vec, } -impl BufferedEventManager { +impl BufferedEventManager { /// Returns a reference to the inner epoll file descriptor. pub fn epfd(&self) -> BorrowedFd { self.event_manager.epfd.as_fd() @@ -31,9 +33,10 @@ impl BufferedEventManager { /// # Errors /// /// When [`libc::epoll_ctl`] returns `-1`. - pub fn add(&mut self, fd: T, events: EventSet, f: Action) -> Result<(), i32> { + pub fn add(&mut self, fd: Fd, events: EventSet, f: Action) -> Result<(), i32> { let res = self.event_manager.add(fd, events, f); self.buffer.reserve(self.event_manager.events.len()); + self.output_buffer.reserve(self.event_manager.events.len()); res } @@ -44,7 +47,7 @@ impl BufferedEventManager { /// # Errors /// /// When [`libc::epoll_ctl`] returns `-1`. - pub fn del(&mut self, fd: T) -> Result { + pub fn del(&mut self, fd: Fd) -> Result { self.event_manager.del(fd) } @@ -60,13 +63,24 @@ impl BufferedEventManager { /// /// When the value given in timeout does not fit within an `i32` e.g. /// `timeout.map(|u| i32::try_from(u).unwrap())`. - pub fn wait(&mut self, timeout: Option) -> Result { + pub fn wait(&mut self, timeout: Option) -> Result<&[T], i32> { // SAFETY: `EventManager::wait` initializes N element from the start of the slice and only // accesses these, thus it will never access uninitialized memory, making this safe. unsafe { self.buffer.set_len(self.buffer.capacity()); + self.output_buffer.set_len(self.output_buffer.capacity()); + } + let n = self + .event_manager + .wait(timeout, &mut self.buffer, &mut self.output_buffer)?; + // SAFETY: This is safe as we call `epoll_wait` within `self.event_manager.wait` with + // `self.buffer.len()` which ensures `n` will be less than or equal to `self.buffer.len()` + // which ensures this slice will only cover valid elements. + unsafe { + Ok(self + .output_buffer + .get_unchecked(..usize::try_from(n).unwrap_unchecked())) } - self.event_manager.wait(timeout, &mut self.buffer) } /// Creates new event manager. @@ -77,29 +91,31 @@ impl BufferedEventManager { pub fn new(close_exec: bool) -> Result { Ok(BufferedEventManager { event_manager: EventManager::new(close_exec)?, - buffer: Vec::new(), + buffer: Vec::with_capacity(0), + output_buffer: Vec::with_capacity(0), }) } pub fn with_capacity(close_exec: bool, capacity: usize) -> Result { Ok(BufferedEventManager { event_manager: EventManager::new(close_exec)?, buffer: Vec::with_capacity(capacity), + output_buffer: Vec::with_capacity(capacity), }) } } -impl Default for BufferedEventManager { +impl Default for BufferedEventManager { fn default() -> Self { Self::new(false).unwrap() } } -pub struct EventManager { +pub struct EventManager { epfd: OwnedFd, - events: HashMap, + events: HashMap>, } -impl std::fmt::Debug for EventManager { +impl std::fmt::Debug for EventManager { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { f.debug_struct("EventManager") .field("epfd", &self.epfd) @@ -115,7 +131,7 @@ impl std::fmt::Debug for EventManager { } } -impl EventManager { +impl EventManager { /// Returns a reference to the inner epoll file descriptor. pub fn epfd(&self) -> BorrowedFd { self.epfd.as_fd() @@ -126,7 +142,7 @@ impl EventManager { /// # Errors /// /// When [`libc::epoll_ctl`] returns `-1`. - pub fn add(&mut self, fd: T, events: EventSet, f: Action) -> Result<(), i32> { + pub fn add(&mut self, fd: Fd, events: EventSet, f: Action) -> Result<(), i32> { let mut event = libc::epoll_event { events: events.bits(), r#u64: u64::try_from(fd.as_raw_fd()).unwrap(), @@ -156,7 +172,7 @@ impl EventManager { /// # Errors /// /// When [`libc::epoll_ctl`] returns `-1`. - pub fn del(&mut self, fd: T) -> Result { + pub fn del(&mut self, fd: Fd) -> Result { match self.events.remove(&fd.as_raw_fd()) { Some(_) => { // SAFETY: Safe when `fd` is a valid file descriptor. @@ -193,13 +209,14 @@ impl EventManager { &mut self, timeout: Option, buffer: &mut [libc::epoll_event], + output_buffer: &mut [T], ) -> Result { // SAFETY: Always safe. match unsafe { libc::epoll_wait( self.epfd.as_raw_fd(), buffer.as_mut_ptr(), - buffer.len().try_into().unwrap(), + buffer.len().try_into().unwrap_unchecked(), timeout.map_or(-1i32, |u| i32::try_from(u).unwrap()), ) } { @@ -211,11 +228,11 @@ impl EventManager { let event = buffer[i]; // For all events which can fire there exists an entry within `self.events` thus // it is safe to unwrap here. - let f: *const dyn Fn(&mut EventManager, EventSet) = self + let f: *const dyn Fn(&mut EventManager, EventSet) -> T = self .events .get(&i32::try_from(event.u64).unwrap_unchecked()) .unwrap_unchecked(); - (*f)(self, EventSet::from_bits_unchecked(event.events)); + output_buffer[i] = (*f)(self, EventSet::from_bits_unchecked(event.events)); } Ok(n) }, @@ -241,7 +258,7 @@ impl EventManager { } } -impl Default for EventManager { +impl Default for EventManager { fn default() -> Self { Self::new(false).unwrap() } @@ -256,14 +273,14 @@ mod tests { #[test] fn debug() { - let manager = BufferedEventManager::default(); + let manager = BufferedEventManager::<()>::default(); let epfd = manager.epfd().as_raw_fd(); - assert_eq!(format!("{manager:?}"),format!("BufferedEventManager {{ event_manager: EventManager {{ epfd: OwnedFd {{ fd: {epfd} }}, events: {{}} }}, buffer: [] }}")); + assert_eq!(format!("{manager:?}"),format!("BufferedEventManager {{ event_manager: EventManager {{ epfd: OwnedFd {{ fd: {epfd} }}, events: {{}} }}, buffer: [], output_buffer: [] }}")); } #[test] fn del_none() { - let mut manager = BufferedEventManager::with_capacity(false, 10).unwrap(); + let mut manager = BufferedEventManager::<()>::with_capacity(false, 10).unwrap(); // SAFETY: Always safe. let event_fd = unsafe { let fd = libc::eventfd(1, 0); @@ -280,6 +297,7 @@ mod tests { fn delete() { static COUNT: AtomicBool = AtomicBool::new(false); let mut manager = BufferedEventManager::default(); + // We set value to 1 so it will trigger on a read event. // SAFETY: Always safe. let event_fd = unsafe { @@ -287,13 +305,14 @@ mod tests { assert_ne!(fd, -1); fd }; + manager .add( event_fd, EventSet::IN, // A closure which will flip the atomic boolean then remove the event fd from the // interest list. - Box::new(move |x: &mut EventManager, _: EventSet| { + Box::new(move |x: &mut EventManager<()>, _| { // Flips the atomic. let cur = COUNT.load(Ordering::SeqCst); COUNT.store(!cur, Ordering::SeqCst); @@ -309,14 +328,17 @@ mod tests { // The file descriptor has been pre-armed, this will immediately call the respective // closure. - assert_eq!(manager.wait(Some(10)), Ok(1)); + let vec = vec![()]; + assert_eq!(manager.wait(Some(10)), Ok(vec.as_slice())); + // As the closure will flip the atomic boolean we assert it has flipped correctly. assert!(COUNT.load(Ordering::SeqCst)); // At this point we have called the closure, since the closure removes the event fd from the // interest list of the inner epoll, calling this again should timeout as there are no event // fd in the inner epolls interest list which could trigger. - assert_eq!(manager.wait(Some(10)), Ok(0)); + let vec = vec![]; + assert_eq!(manager.wait(Some(10)), Ok(vec.as_slice())); // As the `EventManager::wait` should timeout the value of the atomic boolean should not be // flipped. assert!(COUNT.load(Ordering::SeqCst)); @@ -340,7 +362,7 @@ mod tests { .add( event_fd, EventSet::IN, - Box::new(|_: &mut EventManager, _: EventSet| { + Box::new(|_, _| { // Flips the atomic. let cur = COUNT.load(Ordering::SeqCst); COUNT.store(!cur, Ordering::SeqCst); @@ -352,13 +374,15 @@ mod tests { assert!(!COUNT.load(Ordering::SeqCst)); // As the closure will flip the atomic boolean we assert it has flipped correctly. - assert_eq!(manager.wait(Some(10)), Ok(1)); + let vec = vec![()]; + assert_eq!(manager.wait(Some(10)), Ok(vec.as_slice())); // As the closure will flip the atomic boolean we assert it has flipped correctly. assert!(COUNT.load(Ordering::SeqCst)); // The file descriptor has been pre-armed, this will immediately call the respective // closure. - assert_eq!(manager.wait(Some(10)), Ok(1)); + let vec = vec![()]; + assert_eq!(manager.wait(Some(10)), Ok(vec.as_slice())); // As the closure will flip the atomic boolean we assert it has flipped correctly. assert!(!COUNT.load(Ordering::SeqCst)); } @@ -374,7 +398,7 @@ mod tests { let mut manager = BufferedEventManager::default(); // Setup eventfd's and counters. - let subscribers = (0..100) + let subscribers = (0..SUBSCRIBERS) .map(|_| { // SAFETY: Always safe. let event_fd = unsafe { @@ -389,9 +413,7 @@ mod tests { .add( event_fd.as_fd(), EventSet::IN, - Box::new(move |_: &mut EventManager, _: EventSet| { - counter_clone.fetch_add(1, Ordering::SeqCst); - }), + Box::new(move |_, _| counter_clone.fetch_add(1, Ordering::SeqCst)), ) .unwrap(); @@ -417,10 +439,42 @@ mod tests { } // Check counter are the correct values - let n = i32::try_from(FIRING).unwrap(); - assert_eq!(manager.wait(None), Ok(n)); + let arr = [0; FIRING]; + assert_eq!(manager.wait(None), Ok(arr.as_slice())); for i in set { assert_eq!(subscribers[i].1.load(Ordering::SeqCst), 1); } } + + #[test] + fn results() { + let mut manager = BufferedEventManager::default(); + + // We set value to 1 so it will trigger on a read event. + // SAFETY: Always safe. + let event_fd = unsafe { + let fd = libc::eventfd(1, 0); + assert_ne!(fd, -1); + fd + }; + + manager + .add(event_fd, EventSet::IN, Box::new(|_, _| Ok(()))) + .unwrap(); + + // We set value to 1 so it will trigger on a read event. + // SAFETY: Always safe. + let event_fd = unsafe { + let fd = libc::eventfd(1, 0); + assert_ne!(fd, -1); + fd + }; + + manager + .add(event_fd, EventSet::IN, Box::new(|_, _| Err(()))) + .unwrap(); + + let arr = [Ok(()), Err(())]; + assert_eq!(manager.wait(None), Ok(arr.as_slice())); + } }