diff --git a/libafl/src/corpus/minimizer.rs b/libafl/src/corpus/minimizer.rs index 84c6be3875..7a4d2297a4 100644 --- a/libafl/src/corpus/minimizer.rs +++ b/libafl/src/corpus/minimizer.rs @@ -38,7 +38,6 @@ pub type StdCorpusMinimizer = MapCorpusMinimizer MapCorpusMinimizer where - E: UsesState, E::State: HasCorpus + HasMetadata, TS: TestcaseScore, C: Named, @@ -55,7 +54,6 @@ where impl MapCorpusMinimizer where - E: UsesState, for<'a> O: MapObserver + AsIter<'a, Item = T>, C: AsRef, E::State: HasMetadata + HasCorpus + HasExecutions, diff --git a/libafl/src/events/centralized.rs b/libafl/src/events/centralized.rs index 515c8ec5b9..690d4e20f5 100644 --- a/libafl/src/events/centralized.rs +++ b/libafl/src/events/centralized.rs @@ -7,9 +7,9 @@ // 3. The "main evaluator", the evaluator node that will evaluate all the testcases pass by the centralized event manager to see if the testcases are worth propagating // 4. The "main broker", the gathers the stats from the fuzzer clients and broadcast the newly found testcases from the main evaluator. -use alloc::{boxed::Box, string::String, vec::Vec}; +use alloc::{string::String, vec::Vec}; use core::{fmt::Debug, time::Duration}; -use std::{marker::PhantomData, process}; +use std::process; #[cfg(feature = "llmp_compression")] use libafl_bolts::{ @@ -19,27 +19,31 @@ use libafl_bolts::{ use libafl_bolts::{ llmp::{LlmpClient, LlmpClientDescription, Tag}, shmem::{NopShMemProvider, ShMemProvider}, - tuples::Handle, + tuples::{Handle, MatchNameRef}, ClientId, }; -use serde::{Deserialize, Serialize}; +use serde::{de::DeserializeOwned, Deserialize, Serialize}; -use super::NopEventManager; +use super::{ + default_maybe_report_progress, default_report_progress, CanSerializeObserver, ManagerExit, + NopEventManager, +}; #[cfg(feature = "llmp_compression")] use crate::events::llmp::COMPRESS_THRESHOLD; #[cfg(feature = "scalability_introspection")] use crate::state::HasScalabilityMonitor; use crate::{ + corpus::Corpus, events::{ - AdaptiveSerializer, CustomBufEventResult, Event, EventConfig, EventFirer, EventManager, - EventManagerHooksTuple, EventManagerId, EventProcessor, EventRestarter, - HasCustomBufHandlers, HasEventManagerId, LogSeverity, ProgressReporter, + serialize_observers_adaptive, AdaptiveSerializer, Event, EventConfig, EventFirer, + EventManagerHooksTuple, EventManagerId, EventProcessor, EventRestarter, HasEventManagerId, + LogSeverity, ProgressReporter, }, executors::{Executor, HasObservers}, fuzzer::{EvaluatorObservers, ExecutionProcessor}, - inputs::{Input, NopInput, UsesInput}, + inputs::{Input, UsesInput}, observers::{ObserversTuple, TimeObserver}, - state::{HasExecutions, HasLastReportTime, NopState, State, Stoppable, UsesState}, + state::{HasCorpus, HasExecutions, HasLastReportTime, State, Stoppable, UsesState}, Error, HasMetadata, }; @@ -47,11 +51,8 @@ pub(crate) const _LLMP_TAG_TO_MAIN: Tag = Tag(0x3453453); /// A wrapper manager to implement a main-secondary architecture with another broker #[derive(Debug)] -pub struct CentralizedEventManager +pub struct CentralizedEventManager where - EM: UsesState, - EMH: EventManagerHooksTuple, - S: State, SP: ShMemProvider, { inner: EM, @@ -59,20 +60,11 @@ where client: LlmpClient, #[cfg(feature = "llmp_compression")] compressor: GzipCompressor, - time_ref: Option>, hooks: EMH, is_main: bool, - phantom: PhantomData, } -impl - CentralizedEventManager< - NopEventManager>, - (), - NopState, - NopShMemProvider, - > -{ +impl CentralizedEventManager { /// Creates a builder for [`CentralizedEventManager`] #[must_use] pub fn builder() -> CentralizedEventManagerBuilder { @@ -106,17 +98,13 @@ impl CentralizedEventManagerBuilder { } /// Creates a new [`CentralizedEventManager`]. - pub fn build_from_client( + pub fn build_from_client( self, inner: EM, hooks: EMH, client: LlmpClient, - time_obs: Option>, - ) -> Result, Error> + ) -> Result, Error> where - EM: UsesState, - EMH: EventManagerHooksTuple, - S: State, SP: ShMemProvider, { Ok(CentralizedEventManager { @@ -125,9 +113,7 @@ impl CentralizedEventManagerBuilder { client, #[cfg(feature = "llmp_compression")] compressor: GzipCompressor::with_threshold(COMPRESS_THRESHOLD), - time_ref: time_obs, is_main: self.is_main, - phantom: PhantomData, }) } @@ -136,18 +122,14 @@ impl CentralizedEventManagerBuilder { /// If the port is not yet bound, it will act as a broker; otherwise, it /// will act as a client. #[cfg(feature = "std")] - pub fn build_on_port( + pub fn build_on_port( self, inner: EM, hooks: EMH, shmem_provider: SP, port: u16, - time_obs: Option>, - ) -> Result, Error> + ) -> Result, Error> where - EM: UsesState, - EMH: EventManagerHooksTuple, - S: State, SP: ShMemProvider, { let client = LlmpClient::create_attach_to_tcp(shmem_provider, port)?; @@ -157,27 +139,21 @@ impl CentralizedEventManagerBuilder { client, #[cfg(feature = "llmp_compression")] compressor: GzipCompressor::with_threshold(COMPRESS_THRESHOLD), - time_ref: time_obs, is_main: self.is_main, - phantom: PhantomData, }) } /// If a client respawns, it may reuse the existing connection, previously /// stored by [`LlmpClient::to_env()`]. #[cfg(feature = "std")] - pub fn build_existing_client_from_env( + pub fn build_existing_client_from_env( self, inner: EM, hooks: EMH, shmem_provider: SP, env_name: &str, - time_obs: Option>, - ) -> Result, Error> + ) -> Result, Error> where - EM: UsesState, - EMH: EventManagerHooksTuple, - S: State, SP: ShMemProvider, { Ok(CentralizedEventManager { @@ -186,26 +162,20 @@ impl CentralizedEventManagerBuilder { client: LlmpClient::on_existing_from_env(shmem_provider, env_name)?, #[cfg(feature = "llmp_compression")] compressor: GzipCompressor::with_threshold(COMPRESS_THRESHOLD), - time_ref: time_obs, is_main: self.is_main, - phantom: PhantomData, }) } /// Create an existing client from description #[cfg(feature = "std")] - pub fn existing_client_from_description( + pub fn existing_client_from_description( self, inner: EM, hooks: EMH, shmem_provider: SP, description: &LlmpClientDescription, - time_obs: Option>, - ) -> Result, Error> + ) -> Result, Error> where - EM: UsesState, - EMH: EventManagerHooksTuple, - S: State, SP: ShMemProvider, { Ok(CentralizedEventManager { @@ -214,66 +184,18 @@ impl CentralizedEventManagerBuilder { client: LlmpClient::existing_client_from_description(shmem_provider, description)?, #[cfg(feature = "llmp_compression")] compressor: GzipCompressor::with_threshold(COMPRESS_THRESHOLD), - time_ref: time_obs, is_main: self.is_main, - phantom: PhantomData, }) } } -impl UsesState for CentralizedEventManager -where - EM: UsesState, - EMH: EventManagerHooksTuple, - S: State, - SP: ShMemProvider, -{ - type State = EM::State; -} -impl AdaptiveSerializer for CentralizedEventManager +impl EventFirer<::Input, S> + for CentralizedEventManager where - EM: AdaptiveSerializer + UsesState, - EMH: EventManagerHooksTuple, - S: State, - SP: ShMemProvider, -{ - fn serialization_time(&self) -> Duration { - self.inner.serialization_time() - } - fn deserialization_time(&self) -> Duration { - self.inner.deserialization_time() - } - fn serializations_cnt(&self) -> usize { - self.inner.serializations_cnt() - } - fn should_serialize_cnt(&self) -> usize { - self.inner.should_serialize_cnt() - } - - fn serialization_time_mut(&mut self) -> &mut Duration { - self.inner.serialization_time_mut() - } - fn deserialization_time_mut(&mut self) -> &mut Duration { - self.inner.deserialization_time_mut() - } - fn serializations_cnt_mut(&mut self) -> &mut usize { - self.inner.serializations_cnt_mut() - } - fn should_serialize_cnt_mut(&mut self) -> &mut usize { - self.inner.should_serialize_cnt_mut() - } - - fn time_ref(&self) -> &Option> { - &self.time_ref - } -} - -impl EventFirer for CentralizedEventManager -where - EM: AdaptiveSerializer + EventFirer + HasEventManagerId, - EMH: EventManagerHooksTuple, - S: State, + S: HasCorpus, SP: ShMemProvider, + EM: HasEventManagerId + EventFirer<::Input, S>, + ::Input: Input, { fn should_send(&self) -> bool { self.inner.should_send() @@ -282,8 +204,8 @@ where #[allow(clippy::match_same_arms)] fn fire( &mut self, - state: &mut Self::State, - mut event: Event<::Input>, + state: &mut S, + mut event: Event<::Input>, ) -> Result<(), Error> { if !self.is_main { // secondary node @@ -315,45 +237,35 @@ where fn log( &mut self, - state: &mut Self::State, + state: &mut S, severity_level: LogSeverity, message: String, ) -> Result<(), Error> { self.inner.log(state, severity_level, message) } - - fn serialize_observers(&mut self, observers: &OT) -> Result>, Error> - where - OT: ObserversTuple + Serialize, - { - const SERIALIZE_TIME_FACTOR: u32 = 4; // twice as much as the normal llmp em's value cuz it does this job twice. - const SERIALIZE_PERCENTAGE_THRESHOLD: usize = 80; - self.inner.serialize_observers_adaptive( - observers, - SERIALIZE_TIME_FACTOR, - SERIALIZE_PERCENTAGE_THRESHOLD, - ) - } - fn configuration(&self) -> EventConfig { self.inner.configuration() } } -impl EventRestarter for CentralizedEventManager +impl EventRestarter for CentralizedEventManager where - EM: EventRestarter, - EMH: EventManagerHooksTuple, - S: State, + EM: EventRestarter, SP: ShMemProvider, { #[inline] - fn on_restart(&mut self, state: &mut Self::State) -> Result<(), Error> { + fn on_restart(&mut self, state: &mut S) -> Result<(), Error> { self.client.await_safe_to_unmap_blocking(); self.inner.on_restart(state)?; Ok(()) } +} +impl ManagerExit for CentralizedEventManager +where + EM: ManagerExit, + SP: ShMemProvider, +{ fn send_exiting(&mut self) -> Result<(), Error> { self.client.sender_mut().send_exiting()?; self.inner.send_exiting() @@ -366,26 +278,17 @@ where } } -impl EventProcessor for CentralizedEventManager +impl EventProcessor for CentralizedEventManager where - EM: AdaptiveSerializer + EventProcessor + EventFirer + HasEventManagerId, - EMH: EventManagerHooksTuple, - E: HasObservers + Executor, - E::Observers: - ObserversTuple<::Input, ::State> + Serialize, - for<'a> E::Observers: Deserialize<'a>, - S: State, - Self::State: HasExecutions + HasMetadata, + E: HasObservers, + E::Observers: DeserializeOwned, + EM: EventProcessor + HasEventManagerId + EventFirer<::Input, S>, + EMH: EventManagerHooksTuple<::Input, S>, SP: ShMemProvider, - Z: EvaluatorObservers - + ExecutionProcessor, + S: HasCorpus + Stoppable, + ::Input: Input, { - fn process( - &mut self, - fuzzer: &mut Z, - state: &mut Self::State, - executor: &mut E, - ) -> Result { + fn process(&mut self, fuzzer: &mut Z, state: &mut S, executor: &mut E) -> Result { if self.is_main { // main node self.receive_from_secondary(fuzzer, state, executor) @@ -402,55 +305,37 @@ where } } -impl EventManager for CentralizedEventManager +#[cfg(feature = "std")] +impl CanSerializeObserver for CentralizedEventManager where - E: HasObservers + Executor, - E::Observers: - ObserversTuple<::Input, ::State> + Serialize, - for<'a> E::Observers: Deserialize<'a>, - EM: AdaptiveSerializer + EventManager, - EM::State: HasExecutions + HasMetadata + HasLastReportTime, - EMH: EventManagerHooksTuple, - S: State, + EMH: AdaptiveSerializer, SP: ShMemProvider, - Z: EvaluatorObservers - + ExecutionProcessor, + OT: Serialize + MatchNameRef, { + fn serialize_observers(&mut self, observers: &OT) -> Result>, Error> { + serialize_observers_adaptive::(self, observers, 2, 80) + } } -impl HasCustomBufHandlers for CentralizedEventManager +impl ProgressReporter for CentralizedEventManager where - EM: HasCustomBufHandlers, - EMH: EventManagerHooksTuple, - S: State, SP: ShMemProvider, { - /// Adds a custom buffer handler that will run for each incoming `CustomBuf` event. - fn add_custom_buf_handler( + fn maybe_report_progress( &mut self, - handler: Box< - dyn FnMut(&mut Self::State, &str, &[u8]) -> Result, - >, - ) { - self.inner.add_custom_buf_handler(handler); + state: &mut S, + monitor_timeout: Duration, + ) -> Result<(), Error> { + default_maybe_report_progress(self, state, monitor_timeout) } -} -impl ProgressReporter for CentralizedEventManager -where - EM: AdaptiveSerializer + ProgressReporter + HasEventManagerId, - EM::State: HasMetadata + HasExecutions + HasLastReportTime, - EMH: EventManagerHooksTuple, - S: State, - SP: ShMemProvider, -{ + fn report_progress(&mut self, state: &mut S) -> Result<(), Error> { + default_report_progress(self, state) + } } -impl HasEventManagerId for CentralizedEventManager +impl HasEventManagerId for CentralizedEventManager where - EM: HasEventManagerId + UsesState, - EMH: EventManagerHooksTuple, - S: State, SP: ShMemProvider, { fn mgr_id(&self) -> EventManagerId { @@ -458,11 +343,8 @@ where } } -impl CentralizedEventManager +impl CentralizedEventManager where - EM: UsesState, - EMH: EventManagerHooksTuple, - S: State, SP: ShMemProvider, { /// Describe the client event manager's LLMP parts in a restorable fashion @@ -483,11 +365,8 @@ where } } -impl CentralizedEventManager +impl CentralizedEventManager where - EM: UsesState + EventFirer + AdaptiveSerializer + HasEventManagerId, - EMH: EventManagerHooksTuple, - S: State + Stoppable, SP: ShMemProvider, { #[cfg(feature = "llmp_compression")] @@ -523,20 +402,19 @@ where Ok(()) } - fn receive_from_secondary( + fn receive_from_secondary( &mut self, fuzzer: &mut Z, - state: &mut ::State, + state: &mut S, executor: &mut E, ) -> Result where - E: Executor::State> + HasObservers, - E::Observers: - ObserversTuple<::Input, ::State> + Serialize, - ::State: UsesInput + HasExecutions + HasMetadata, - for<'a> E::Observers: Deserialize<'a>, - Z: ExecutionProcessor::State> - + EvaluatorObservers, + S: HasCorpus + Stoppable, + ::Input: DeserializeOwned + Input, + EMH: EventManagerHooksTuple<::Input, S>, + E: HasObservers, + E::Observers: DeserializeOwned, + EM: HasEventManagerId + EventFirer<::Input, S>, { // TODO: Get around local event copy by moving handle_in_client let self_id = self.client.sender().id(); @@ -561,8 +439,7 @@ where } else { msg }; - let event: Event<<::State as UsesInput>::Input> = - postcard::from_bytes(event_bytes)?; + let event: Event<::Input> = postcard::from_bytes(event_bytes)?; log::debug!("Processor received message {}", event.name_detailed()); self.handle_in_main(fuzzer, executor, state, client_id, event)?; count += 1; @@ -571,22 +448,21 @@ where } // Handle arriving events in the main node - fn handle_in_main( + fn handle_in_main( &mut self, fuzzer: &mut Z, executor: &mut E, - state: &mut ::State, + state: &mut S, client_id: ClientId, - event: Event<<::State as UsesInput>::Input>, + event: Event<::Input>, ) -> Result<(), Error> where - E: Executor::State> + HasObservers, - E::Observers: - ObserversTuple<::Input, ::State> + Serialize, - ::State: UsesInput + HasExecutions + HasMetadata, - for<'a> E::Observers: Deserialize<'a> + Serialize, - Z: ExecutionProcessor::State> - + EvaluatorObservers, + E: HasObservers, + E::Observers: DeserializeOwned, + S: HasCorpus + Stoppable, + EMH: EventManagerHooksTuple<::Input, S>, + ::Input: Input, + EM: HasEventManagerId + EventFirer<::Input, S>, { log::debug!("handle_in_main!"); diff --git a/libafl/src/events/events_hooks/mod.rs b/libafl/src/events/events_hooks/mod.rs index b34b1810f6..e7736212e5 100644 --- a/libafl/src/events/events_hooks/mod.rs +++ b/libafl/src/events/events_hooks/mod.rs @@ -4,20 +4,17 @@ //! other clients use libafl_bolts::ClientId; -use crate::{events::Event, state::State, Error}; +use crate::{events::Event, Error}; /// The `broker_hooks` that are run before and after the event manager calls `handle_in_client` -pub trait EventManagerHook -where - S: State, -{ +pub trait EventManagerHook { /// The hook that runs before `handle_in_client` /// Return false if you want to cancel the subsequent event handling fn pre_exec( &mut self, state: &mut S, client_id: ClientId, - event: &Event, + event: &Event, ) -> Result; /// Triggered when the even manager decides to fire the event after processing @@ -25,7 +22,7 @@ where &mut self, _state: &mut S, _client_id: ClientId, - _event: &Event, + _event: &Event, ) -> Result<(), Error> { Ok(()) } @@ -38,16 +35,13 @@ where } /// The tuples contains `broker_hooks` to be executed for `handle_in_client` -pub trait EventManagerHooksTuple -where - S: State, -{ +pub trait EventManagerHooksTuple { /// The hook that runs before `handle_in_client` fn pre_exec_all( &mut self, state: &mut S, client_id: ClientId, - event: &Event, + event: &Event, ) -> Result; /// Ran when the Event Manager decides to accept an event and propagates it @@ -55,23 +49,20 @@ where &mut self, state: &mut S, client_id: ClientId, - event: &Event, + event: &Event, ) -> Result<(), Error>; /// The hook that runs after `handle_in_client` fn post_exec_all(&mut self, state: &mut S, client_id: ClientId) -> Result; } -impl EventManagerHooksTuple for () -where - S: State, -{ +impl EventManagerHooksTuple for () { /// The hook that runs before `handle_in_client` fn pre_exec_all( &mut self, _state: &mut S, _client_id: ClientId, - _event: &Event, + _event: &Event, ) -> Result { Ok(true) } @@ -80,7 +71,7 @@ where &mut self, _state: &mut S, _client_id: ClientId, - _event: &Event, + _event: &Event, ) -> Result<(), Error> { Ok(()) } @@ -91,18 +82,17 @@ where } } -impl EventManagerHooksTuple for (Head, Tail) +impl EventManagerHooksTuple for (Head, Tail) where - Head: EventManagerHook, - Tail: EventManagerHooksTuple, - S: State, + Head: EventManagerHook, + Tail: EventManagerHooksTuple, { /// The hook that runs before `handle_in_client` fn pre_exec_all( &mut self, state: &mut S, client_id: ClientId, - event: &Event, + event: &Event, ) -> Result { let first = self.0.pre_exec(state, client_id, event)?; let second = self.1.pre_exec_all(state, client_id, event)?; @@ -113,7 +103,7 @@ where &mut self, state: &mut S, client_id: ClientId, - event: &Event, + event: &Event, ) -> Result<(), Error> { self.0.on_fire(state, client_id, event)?; self.1.on_fire_all(state, client_id, event) diff --git a/libafl/src/events/launcher.rs b/libafl/src/events/launcher.rs index fcd75cafd2..9842298e88 100644 --- a/libafl/src/events/launcher.rs +++ b/libafl/src/events/launcher.rs @@ -48,6 +48,8 @@ use libafl_bolts::{ shmem::ShMemProvider, tuples::{tuple_list, Handle}, }; +#[cfg(all(unix, feature = "std", feature = "fork"))] +use serde::{de::DeserializeOwned, Serialize}; #[cfg(feature = "std")] use typed_builder::TypedBuilder; @@ -58,13 +60,16 @@ use super::StdLlmpEventHook; use crate::events::multi_machine::NodeDescriptor; #[cfg(all(unix, feature = "std", feature = "fork", feature = "multi_machine"))] use crate::events::multi_machine::TcpMultiMachineHooks; -#[cfg(all(unix, feature = "std", feature = "fork"))] -use crate::events::{centralized::CentralizedEventManager, CentralizedLlmpHook}; -#[cfg(all(unix, feature = "std", feature = "fork"))] -use crate::inputs::UsesInput; use crate::observers::TimeObserver; #[cfg(all(unix, feature = "std", feature = "fork"))] use crate::state::UsesState; +#[cfg(all(unix, feature = "std", feature = "fork"))] +use crate::{ + corpus::Corpus, + events::{centralized::CentralizedEventManager, CentralizedLlmpHook}, + inputs::Input, + state::HasCorpus, +}; #[cfg(feature = "std")] use crate::{ events::{ @@ -175,7 +180,8 @@ where #[cfg(all(unix, feature = "std", feature = "fork"))] pub fn launch(&mut self) -> Result<(), Error> where - S: State + HasExecutions, + S: HasExecutions + Serialize + HasCorpus + DeserializeOwned, + ::Input: Input, CF: FnOnce(Option, LlmpRestartingEventManager<(), S, SP>, CoreId) -> Result<(), Error>, { Self::launch_with_hooks(self, tuple_list!()) @@ -186,7 +192,7 @@ where #[allow(unused_mut, clippy::match_wild_err_arm)] pub fn launch(&mut self) -> Result<(), Error> where - S: State + HasExecutions, + S: HasExecutions, CF: FnOnce(Option, LlmpRestartingEventManager<(), S, SP>, CoreId) -> Result<(), Error>, { Self::launch_with_hooks(self, tuple_list!()) @@ -205,8 +211,9 @@ where #[allow(clippy::too_many_lines)] pub fn launch_with_hooks(&mut self, hooks: EMH) -> Result<(), Error> where - S: State + HasExecutions, - EMH: EventManagerHooksTuple + Clone + Copy, + S: HasCorpus + HasExecutions + DeserializeOwned + Serialize, + ::Input: Input, + EMH: EventManagerHooksTuple<::Input, S> + Clone + Copy, CF: FnOnce(Option, LlmpRestartingEventManager, CoreId) -> Result<(), Error>, { if self.cores.ids.is_empty() { @@ -341,8 +348,6 @@ where #[allow(unused_mut, clippy::match_wild_err_arm, clippy::too_many_lines)] pub fn launch_with_hooks(&mut self, hooks: EMH) -> Result<(), Error> where - S: State + HasExecutions, - EMH: EventManagerHooksTuple + Clone + Copy, CF: FnOnce(Option, LlmpRestartingEventManager, CoreId) -> Result<(), Error>, { use libafl_bolts::core_affinity; @@ -576,16 +581,16 @@ where /// Launch a standard Centralized-based fuzzer pub fn launch(&mut self) -> Result<(), Error> where - S: State, - S::Input: Send + Sync + 'static, + S: DeserializeOwned + HasCorpus + Serialize, + ::Input: Input + 'static + Send + Sync, CF: FnOnce( Option, - CentralizedEventManager, (), S, SP>, + CentralizedEventManager, (), SP>, CoreId, ) -> Result<(), Error>, MF: FnOnce( Option, - CentralizedEventManager, (), S, SP>, + CentralizedEventManager, (), SP>, CoreId, ) -> Result<(), Error>, { @@ -628,17 +633,15 @@ where secondary_inner_mgr_builder: EMB, ) -> Result<(), Error> where - S: State, - S::Input: Send + Sync + 'static, - CF: FnOnce(Option, CentralizedEventManager, CoreId) -> Result<(), Error>, - EM: UsesState, + S: HasCorpus, EMB: FnOnce(&Self, CoreId) -> Result<(Option, EM), Error>, + ::Input: Input + Send + Sync + 'static, + CF: FnOnce(Option, CentralizedEventManager, CoreId) -> Result<(), Error>, MF: FnOnce( Option, - CentralizedEventManager, // No broker_hooks for centralized EM + CentralizedEventManager, // No broker_hooks for centralized EM CoreId, ) -> Result<(), Error>, - <::State as UsesInput>::Input: Send + Sync + 'static, { let mut main_inner_mgr_builder = Some(main_inner_mgr_builder); let mut secondary_inner_mgr_builder = Some(secondary_inner_mgr_builder); @@ -717,7 +720,6 @@ where tuple_list!(), self.shmem_provider.clone(), self.centralized_broker_port, - self.time_obs.clone(), )?; self.main_run_client.take().unwrap()(state, c_mgr, *bind_to)?; @@ -735,7 +737,6 @@ where tuple_list!(), self.shmem_provider.clone(), self.centralized_broker_port, - self.time_obs.clone(), )?; self.secondary_run_client.take().unwrap()(state, c_mgr, *bind_to)?; @@ -758,7 +759,7 @@ where } = unsafe { TcpMultiMachineHooks::builder() .node_descriptor(self.multi_machine_node_descriptor.clone()) - .build::<<::State as UsesInput>::Input>()? + .build::<::Input>()? }; let mut brokers = Brokers::new(); @@ -773,7 +774,8 @@ where ); #[cfg(not(feature = "multi_machine"))] - let centralized_hooks = tuple_list!(CentralizedLlmpHook::::new()?); + let centralized_hooks = + tuple_list!(CentralizedLlmpHook::<::Input>::new()?); // TODO switch to false after solving the bug let mut broker = LlmpBroker::with_keep_pages_attach_to_tcp( @@ -797,12 +799,13 @@ where log::info!("I am broker!!."); #[cfg(not(feature = "multi_machine"))] - let llmp_hook = - tuple_list!(StdLlmpEventHook::::new(self.monitor.clone())?); + let llmp_hook = tuple_list!(StdLlmpEventHook::<::Input, MT>::new( + self.monitor.clone() + )?); #[cfg(feature = "multi_machine")] let llmp_hook = tuple_list!( - StdLlmpEventHook::::new(self.monitor.clone())?, + StdLlmpEventHook::<::Input, MT>::new(self.monitor.clone())?, multi_machine_sender_hook, ); diff --git a/libafl/src/events/llmp/mgr.rs b/libafl/src/events/llmp/mgr.rs index 1153c30610..c06513e839 100644 --- a/libafl/src/events/llmp/mgr.rs +++ b/libafl/src/events/llmp/mgr.rs @@ -3,7 +3,7 @@ #[cfg(feature = "std")] use alloc::string::ToString; -use alloc::{boxed::Box, vec::Vec}; +use alloc::vec::Vec; use core::{marker::PhantomData, time::Duration}; #[cfg(feature = "std")] use std::net::TcpStream; @@ -17,7 +17,7 @@ use libafl_bolts::{ current_time, llmp::{LlmpClient, LlmpClientDescription, LLMP_FLAG_FROM_MM}, shmem::{NopShMemProvider, ShMemProvider}, - tuples::Handle, + tuples::{Handle, MatchNameRef}, ClientId, }; #[cfg(feature = "std")] @@ -25,22 +25,25 @@ use libafl_bolts::{ llmp::{recv_tcp_msg, send_tcp_msg, TcpRequest, TcpResponse}, IP_LOCALHOST, }; -use serde::{Deserialize, Serialize}; +use serde::{de::DeserializeOwned, Deserialize, Serialize}; #[cfg(feature = "llmp_compression")] use crate::events::llmp::COMPRESS_THRESHOLD; use crate::{ + corpus::{Corpus, HasCurrentCorpusId}, events::{ + default_maybe_report_progress, default_on_restart, default_report_progress, + events_hooks::EventManagerHooksTuple, llmp::{LLMP_TAG_EVENT_TO_BOTH, _LLMP_TAG_EVENT_TO_BROKER}, - AdaptiveSerializer, CustomBufEventResult, CustomBufHandlerFn, Event, EventConfig, - EventFirer, EventManager, EventManagerHooksTuple, EventManagerId, EventProcessor, - EventRestarter, HasCustomBufHandlers, HasEventManagerId, ProgressReporter, + serialize_observers_adaptive, AdaptiveSerializer, CanSerializeObserver, Event, EventConfig, + EventFirer, EventManagerId, EventProcessor, EventRestarter, HasEventManagerId, ManagerExit, + ProgressReporter, }, executors::{Executor, HasObservers}, fuzzer::{Evaluator, EvaluatorObservers, ExecutionProcessor}, - inputs::{NopInput, UsesInput}, + inputs::{Input, NopInput}, observers::{ObserversTuple, TimeObserver}, - state::{HasExecutions, HasImported, HasLastReportTime, NopState, State, UsesState}, + state::{HasCorpus, HasExecutions, HasImported, HasLastReportTime, NopState, Stoppable}, Error, HasMetadata, }; @@ -48,7 +51,6 @@ use crate::{ /// using low-level message passing, `llmp`. pub struct LlmpEventManager where - S: State, SP: ShMemProvider, { /// We only send 1 testcase for every `throttle` second @@ -60,8 +62,6 @@ where hooks: EMH, /// The LLMP client for inter process communication llmp: LlmpClient, - /// The custom buf handler - custom_buf_handlers: Vec>>, #[cfg(feature = "llmp_compression")] compressor: GzipCompressor, /// The configuration defines this specific fuzzer. @@ -146,7 +146,6 @@ impl LlmpEventManagerBuilder { ) -> Result, Error> where SP: ShMemProvider, - S: State, { Ok(LlmpEventManager { throttle: self.throttle, @@ -163,7 +162,6 @@ impl LlmpEventManagerBuilder { should_serialize_cnt: 0, time_ref, phantom: PhantomData, - custom_buf_handlers: vec![], }) } @@ -179,7 +177,6 @@ impl LlmpEventManagerBuilder { ) -> Result, Error> where SP: ShMemProvider, - S: State, { let llmp = LlmpClient::create_attach_to_tcp(shmem_provider, port)?; Ok(LlmpEventManager { @@ -197,7 +194,6 @@ impl LlmpEventManagerBuilder { should_serialize_cnt: 0, time_ref, phantom: PhantomData, - custom_buf_handlers: vec![], }) } @@ -213,7 +209,6 @@ impl LlmpEventManagerBuilder { ) -> Result, Error> where SP: ShMemProvider, - S: State, { let llmp = LlmpClient::on_existing_from_env(shmem_provider, env_name)?; Ok(LlmpEventManager { @@ -231,7 +226,6 @@ impl LlmpEventManagerBuilder { should_serialize_cnt: 0, time_ref, phantom: PhantomData, - custom_buf_handlers: vec![], }) } @@ -245,7 +239,6 @@ impl LlmpEventManagerBuilder { ) -> Result, Error> where SP: ShMemProvider, - S: State, { let llmp = LlmpClient::existing_client_from_description(shmem_provider, description)?; Ok(LlmpEventManager { @@ -263,7 +256,6 @@ impl LlmpEventManagerBuilder { should_serialize_cnt: 0, time_ref, phantom: PhantomData, - custom_buf_handlers: vec![], }) } } @@ -271,7 +263,6 @@ impl LlmpEventManagerBuilder { impl AdaptiveSerializer for LlmpEventManager where SP: ShMemProvider, - S: State, { fn serialization_time(&self) -> Duration { self.serialization_time @@ -304,10 +295,20 @@ where } } +#[cfg(feature = "std")] +impl CanSerializeObserver for LlmpEventManager +where + SP: ShMemProvider, + OT: Serialize + MatchNameRef, +{ + fn serialize_observers(&mut self, observers: &OT) -> Result>, Error> { + serialize_observers_adaptive::(self, observers, 2, 80) + } +} + impl core::fmt::Debug for LlmpEventManager where SP: ShMemProvider, - S: State, { fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { let mut debug_struct = f.debug_struct("LlmpEventManager"); @@ -325,7 +326,6 @@ where impl Drop for LlmpEventManager where SP: ShMemProvider, - S: State, { /// LLMP clients will have to wait until their pages are mapped by somebody. fn drop(&mut self) { @@ -335,7 +335,6 @@ where impl LlmpEventManager where - S: State, SP: ShMemProvider, { /// Calling this function will tell the llmp broker that this client is exiting @@ -386,8 +385,6 @@ where impl LlmpEventManager where - EMH: EventManagerHooksTuple, - S: State + HasExecutions + HasMetadata + HasImported, SP: ShMemProvider, { // Handle arriving events in the client @@ -398,15 +395,14 @@ where executor: &mut E, state: &mut S, client_id: ClientId, - event: Event, + event: Event<::Input>, ) -> Result<(), Error> where - E: Executor + HasObservers, - E::Observers: ObserversTuple + Serialize, - for<'a> E::Observers: Deserialize<'a>, - Z: ExecutionProcessor - + EvaluatorObservers - + Evaluator, + S: HasCorpus + HasImported + Stoppable, + EMH: EventManagerHooksTuple<::Input, S>, + ::Input: Input, + E: HasObservers, + E::Observers: DeserializeOwned, { if !self.hooks.pre_exec_all(state, client_id, &event)? { return Ok(()); @@ -461,13 +457,6 @@ where } } } - Event::CustomBuf { tag, buf } => { - for handler in &mut self.custom_buf_handlers { - if handler(state, &tag, &buf)? == CustomBufEventResult::Handled { - break; - } - } - } Event::Stop => { state.request_stop(); } @@ -484,7 +473,7 @@ where } } -impl LlmpEventManager { +impl LlmpEventManager { /// Send information that this client is exiting. /// The other side may free up all allocated memory. /// We are no longer allowed to send anything afterwards. @@ -493,17 +482,10 @@ impl LlmpEventManager { } } -impl UsesState for LlmpEventManager +impl EventFirer<::Input, S> for LlmpEventManager where - S: State, - SP: ShMemProvider, -{ - type State = S; -} - -impl EventFirer for LlmpEventManager -where - S: State, + S: HasCorpus, + ::Input: Serialize, SP: ShMemProvider, { fn should_send(&self) -> bool { @@ -517,8 +499,8 @@ where #[cfg(feature = "llmp_compression")] fn fire( &mut self, - _state: &mut Self::State, - event: Event<::Input>, + _state: &mut S, + event: Event<::Input>, ) -> Result<(), Error> { let serialized = postcard::to_allocvec(&event)?; let flags = LLMP_FLAG_INITIALIZED; @@ -551,27 +533,23 @@ where Ok(()) } - fn serialize_observers(&mut self, observers: &OT) -> Result>, Error> - where - OT: ObserversTuple + Serialize, - { - const SERIALIZE_TIME_FACTOR: u32 = 2; - const SERIALIZE_PERCENTAGE_THRESHOLD: usize = 80; - self.serialize_observers_adaptive( - observers, - SERIALIZE_TIME_FACTOR, - SERIALIZE_PERCENTAGE_THRESHOLD, - ) - } - fn configuration(&self) -> EventConfig { self.configuration } } -impl EventRestarter for LlmpEventManager +impl EventRestarter for LlmpEventManager +where + SP: ShMemProvider, + S: HasCurrentCorpusId, +{ + fn on_restart(&mut self, state: &mut S) -> Result<(), Error> { + default_on_restart(self, state) + } +} + +impl ManagerExit for LlmpEventManager where - S: State, SP: ShMemProvider, { /// The LLMP client needs to wait until a broker has mapped all pages before shutting down. @@ -582,24 +560,16 @@ where } } -impl EventProcessor for LlmpEventManager +impl EventProcessor for LlmpEventManager where - EMH: EventManagerHooksTuple, - S: State + HasExecutions + HasMetadata + HasImported, + E: HasObservers, SP: ShMemProvider, - E: HasObservers + Executor, - E::Observers: ObserversTuple + Serialize, - for<'a> E::Observers: Deserialize<'a>, - Z: ExecutionProcessor - + EvaluatorObservers - + Evaluator, + E::Observers: DeserializeOwned, + S: HasCorpus + HasImported + Stoppable, + EMH: EventManagerHooksTuple<::Input, S>, + ::Input: DeserializeOwned + Input, { - fn process( - &mut self, - fuzzer: &mut Z, - state: &mut Self::State, - executor: &mut E, - ) -> Result { + fn process(&mut self, fuzzer: &mut Z, state: &mut S, executor: &mut E) -> Result { // TODO: Get around local event copy by moving handle_in_client let self_id = self.llmp.sender().id(); let mut count = 0; @@ -623,7 +593,7 @@ where } else { msg }; - let event: Event = postcard::from_bytes(event_bytes)?; + let event: Event<::Input> = postcard::from_bytes(event_bytes)?; log::debug!("Received event in normal llmp {}", event.name_detailed()); // If the message comes from another machine, do not @@ -643,43 +613,26 @@ where } } -impl EventManager for LlmpEventManager +impl ProgressReporter for LlmpEventManager where - E: HasObservers + Executor, - E::Observers: ObserversTuple + Serialize, - for<'a> E::Observers: Deserialize<'a>, - EMH: EventManagerHooksTuple, - S: State + HasExecutions + HasMetadata + HasLastReportTime + HasImported, SP: ShMemProvider, - Z: ExecutionProcessor - + EvaluatorObservers - + Evaluator, + S: HasMetadata + HasExecutions + HasLastReportTime, { -} - -impl HasCustomBufHandlers for LlmpEventManager -where - S: State, - SP: ShMemProvider, -{ - fn add_custom_buf_handler( + fn maybe_report_progress( &mut self, - handler: Box Result>, - ) { - self.custom_buf_handlers.push(handler); + state: &mut S, + monitor_timeout: Duration, + ) -> Result<(), Error> { + default_maybe_report_progress(self, state, monitor_timeout) } -} -impl ProgressReporter for LlmpEventManager -where - S: State + HasExecutions + HasMetadata + HasLastReportTime, - SP: ShMemProvider, -{ + fn report_progress(&mut self, state: &mut S) -> Result<(), Error> { + default_report_progress(self, state) + } } impl HasEventManagerId for LlmpEventManager where - S: State, SP: ShMemProvider, { /// Gets the id assigned to this staterestorer. diff --git a/libafl/src/events/llmp/mod.rs b/libafl/src/events/llmp/mod.rs index bdbe32f4ba..086656be77 100644 --- a/libafl/src/events/llmp/mod.rs +++ b/libafl/src/events/llmp/mod.rs @@ -1,7 +1,6 @@ //! LLMP-backed event manager for scalable multi-processed fuzzing -use alloc::{boxed::Box, vec::Vec}; -use core::{marker::PhantomData, time::Duration}; +use core::{fmt::Debug, marker::PhantomData, time::Duration}; #[cfg(feature = "llmp_compression")] use libafl_bolts::{ @@ -13,14 +12,15 @@ use libafl_bolts::{ shmem::{NopShMemProvider, ShMemProvider}, ClientId, }; -use serde::Deserialize; +use serde::{de::DeserializeOwned, Deserialize}; use crate::{ - events::{CustomBufEventResult, CustomBufHandlerFn, Event, EventFirer}, + corpus::Corpus, + events::{Event, EventFirer}, executors::{Executor, HasObservers}, fuzzer::{EvaluatorObservers, ExecutionProcessor}, - inputs::{Input, InputConverter, NopInput, NopInputConverter, UsesInput}, - state::{HasExecutions, NopState, State, Stoppable, UsesState}, + inputs::{Input, InputConverter, NopInput, NopInputConverter}, + state::{HasCorpus, HasExecutions, NopState, State, Stoppable}, Error, HasMetadata, }; @@ -84,19 +84,13 @@ impl LlmpShouldSaveState { } /// A manager-like llmp client that converts between input types -pub struct LlmpEventConverter +pub struct LlmpEventConverter where - S: UsesInput, SP: ShMemProvider, - IC: InputConverter, - ICB: InputConverter, - DI: Input, { throttle: Option, llmp: LlmpClient, last_sent: Duration, - /// The custom buf handler - custom_buf_handlers: Vec>>, #[cfg(feature = "llmp_compression")] compressor: GzipCompressor, converter: Option, @@ -106,7 +100,6 @@ where impl LlmpEventConverter< - NopInput, NopInputConverter, NopInputConverter, NopState, @@ -142,18 +135,14 @@ impl LlmpEventConverterBuilder { } /// Create a event converter from a raw llmp client - pub fn build_from_client( + pub fn build_from_client( self, llmp: LlmpClient, converter: Option, converter_back: Option, - ) -> Result, Error> + ) -> Result, Error> where SP: ShMemProvider, - S: UsesInput, - IC: InputConverter, - ICB: InputConverter, - DI: Input, { Ok(LlmpEventConverter { throttle: self.throttle, @@ -164,25 +153,20 @@ impl LlmpEventConverterBuilder { converter, converter_back, phantom: PhantomData, - custom_buf_handlers: vec![], }) } /// Create a client from port and the input converters #[cfg(feature = "std")] - pub fn build_on_port( + pub fn build_on_port( self, shmem_provider: SP, port: u16, converter: Option, converter_back: Option, - ) -> Result, Error> + ) -> Result, Error> where SP: ShMemProvider, - S: UsesInput, - IC: InputConverter, - ICB: InputConverter, - DI: Input, { let llmp = LlmpClient::create_attach_to_tcp(shmem_provider, port)?; Ok(LlmpEventConverter { @@ -194,25 +178,20 @@ impl LlmpEventConverterBuilder { converter, converter_back, phantom: PhantomData, - custom_buf_handlers: vec![], }) } /// If a client respawns, it may reuse the existing connection, previously stored by [`LlmpClient::to_env()`]. #[cfg(feature = "std")] - pub fn build_existing_client_from_env( + pub fn build_existing_client_from_env( self, shmem_provider: SP, env_name: &str, converter: Option, converter_back: Option, - ) -> Result, Error> + ) -> Result, Error> where SP: ShMemProvider, - S: UsesInput, - IC: InputConverter, - ICB: InputConverter, - DI: Input, { let llmp = LlmpClient::on_existing_from_env(shmem_provider, env_name)?; Ok(LlmpEventConverter { @@ -224,18 +203,15 @@ impl LlmpEventConverterBuilder { converter, converter_back, phantom: PhantomData, - custom_buf_handlers: vec![], }) } } -impl core::fmt::Debug for LlmpEventConverter +impl Debug for LlmpEventConverter where SP: ShMemProvider, - S: UsesInput, - IC: InputConverter, - ICB: InputConverter, - DI: Input, + ICB: Debug, + IC: Debug, { fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { let mut debug_struct = f.debug_struct("LlmpEventConverter"); @@ -251,13 +227,9 @@ where } } -impl LlmpEventConverter +impl LlmpEventConverter where - S: UsesInput + HasExecutions + HasMetadata + Stoppable, SP: ShMemProvider, - IC: InputConverter, - ICB: InputConverter, - DI: Input, { // TODO other new_* routines @@ -283,7 +255,7 @@ where } // Handle arriving events in the client - fn handle_in_client( + fn handle_in_client( &mut self, fuzzer: &mut Z, executor: &mut E, @@ -293,10 +265,8 @@ where event: Event, ) -> Result<(), Error> where - E: Executor + HasObservers, - EM: UsesState + EventFirer, - for<'a> E::Observers: Deserialize<'a>, - Z: ExecutionProcessor + EvaluatorObservers, + ICB: InputConverter::Input, From = DI>, + S: HasCorpus, { match event { Event::NewTestcase { @@ -308,7 +278,7 @@ where return Ok(()); }; - let res = fuzzer.evaluate_input_with_observers::( + let res = fuzzer.evaluate_input_with_observers( state, executor, manager, @@ -321,14 +291,6 @@ where } Ok(()) } - Event::CustomBuf { tag, buf } => { - for handler in &mut self.custom_buf_handlers { - if handler(state, &tag, &buf)? == CustomBufEventResult::Handled { - break; - } - } - Ok(()) - } Event::Stop => Ok(()), _ => Err(Error::unknown(format!( "Received illegal message that message should not have arrived: {:?}.", @@ -339,7 +301,7 @@ where /// Handle arriving events in the client #[allow(clippy::unused_self)] - pub fn process( + pub fn process( &mut self, fuzzer: &mut Z, state: &mut S, @@ -347,10 +309,9 @@ where manager: &mut EM, ) -> Result where - E: Executor + HasObservers, - EM: UsesState + EventFirer, - for<'a> E::Observers: Deserialize<'a>, - Z: ExecutionProcessor + EvaluatorObservers, + DI: DeserializeOwned + Input, + ICB: InputConverter::Input, From = DI>, + S: HasCorpus, { // TODO: Get around local event copy by moving handle_in_client let self_id = self.llmp.sender().id(); @@ -385,24 +346,11 @@ where } } -impl UsesState for LlmpEventConverter +impl EventFirer<::Input, S> + for LlmpEventConverter where - S: State, - SP: ShMemProvider, - IC: InputConverter, - ICB: InputConverter, - DI: Input, -{ - type State = S; -} - -impl EventFirer for LlmpEventConverter -where - S: State, - SP: ShMemProvider, - IC: InputConverter, - ICB: InputConverter, - DI: Input, + IC: InputConverter::Input>, + S: HasCorpus, { fn should_send(&self) -> bool { if let Some(throttle) = self.throttle { @@ -415,8 +363,8 @@ where #[cfg(feature = "llmp_compression")] fn fire( &mut self, - _state: &mut Self::State, - event: Event<::Input>, + _state: &mut S, + event: Event<::Input>, ) -> Result<(), Error> { if self.converter.is_none() { return Ok(()); @@ -472,8 +420,8 @@ where #[cfg(not(feature = "llmp_compression"))] fn fire( &mut self, - _state: &mut Self::State, - event: Event<::Input>, + _state: &mut S, + event: Event<::Input>, ) -> Result<(), Error> { if self.converter.is_none() { return Ok(()); diff --git a/libafl/src/events/llmp/restarting.rs b/libafl/src/events/llmp/restarting.rs index 1177e270a4..c242be8803 100644 --- a/libafl/src/events/llmp/restarting.rs +++ b/libafl/src/events/llmp/restarting.rs @@ -3,7 +3,7 @@ //! When the target crashes, a watch process (the parent) will //! restart/refork it. -use alloc::{boxed::Box, vec::Vec}; +use alloc::vec::Vec; #[cfg(all(unix, not(miri), feature = "std"))] use core::ptr::addr_of_mut; #[cfg(feature = "std")] @@ -29,28 +29,29 @@ use libafl_bolts::{ use libafl_bolts::{ llmp::{Broker, LlmpBroker}, shmem::ShMemProvider, - tuples::{tuple_list, Handle}, + tuples::{tuple_list, Handle, MatchNameRef}, }; -use serde::{Deserialize, Serialize}; +use serde::{de::DeserializeOwned, Serialize}; #[cfg(feature = "std")] use typed_builder::TypedBuilder; +#[cfg(feature = "std")] +use crate::events::AdaptiveSerializer; #[cfg(all(unix, feature = "std", not(miri)))] use crate::events::EVENTMGR_SIGHANDLER_STATE; -#[cfg(feature = "std")] -use crate::events::{AdaptiveSerializer, CustomBufEventResult, HasCustomBufHandlers}; use crate::{ + corpus::Corpus, events::{ - Event, EventConfig, EventFirer, EventManager, EventManagerHooksTuple, EventManagerId, - EventProcessor, EventRestarter, HasEventManagerId, LlmpEventManager, LlmpShouldSaveState, - ProgressReporter, StdLlmpEventHook, + default_maybe_report_progress, default_report_progress, serialize_observers_adaptive, + CanSerializeObserver, Event, EventConfig, EventFirer, EventManagerHooksTuple, + EventManagerId, EventProcessor, EventRestarter, HasEventManagerId, LlmpEventManager, + LlmpShouldSaveState, ManagerExit, ProgressReporter, StdLlmpEventHook, }, - executors::{Executor, HasObservers}, - fuzzer::{Evaluator, EvaluatorObservers, ExecutionProcessor}, - inputs::UsesInput, + executors::HasObservers, + inputs::Input, monitors::Monitor, - observers::{ObserversTuple, TimeObserver}, - state::{HasExecutions, HasImported, HasLastReportTime, State, UsesState}, + observers::TimeObserver, + state::{HasCorpus, HasExecutions, HasImported, HasLastReportTime, Stoppable}, Error, HasMetadata, }; @@ -59,7 +60,6 @@ use crate::{ #[derive(Debug)] pub struct LlmpRestartingEventManager where - S: State, SP: ShMemProvider, //CE: CustomEvent, { @@ -71,11 +71,21 @@ where save_state: LlmpShouldSaveState, } +#[cfg(feature = "std")] +impl CanSerializeObserver for LlmpRestartingEventManager +where + SP: ShMemProvider, + OT: Serialize + MatchNameRef, +{ + fn serialize_observers(&mut self, observers: &OT) -> Result>, Error> { + serialize_observers_adaptive::(self, observers, 2, 80) + } +} + #[cfg(feature = "std")] impl AdaptiveSerializer for LlmpRestartingEventManager where SP: ShMemProvider, - S: State, { fn serialization_time(&self) -> Duration { self.llmp_mgr.serialization_time() @@ -109,27 +119,31 @@ where } #[cfg(feature = "std")] -impl UsesState for LlmpRestartingEventManager +impl ProgressReporter for LlmpRestartingEventManager where - S: State, + S: HasExecutions + HasMetadata + HasLastReportTime, SP: ShMemProvider, { - type State = S; -} + fn maybe_report_progress( + &mut self, + state: &mut S, + monitor_timeout: Duration, + ) -> Result<(), Error> { + default_maybe_report_progress(self, state, monitor_timeout) + } -#[cfg(feature = "std")] -impl ProgressReporter for LlmpRestartingEventManager -where - S: State + HasExecutions + HasMetadata + HasLastReportTime, - SP: ShMemProvider, -{ + fn report_progress(&mut self, state: &mut S) -> Result<(), Error> { + default_report_progress(self, state) + } } #[cfg(feature = "std")] -impl EventFirer for LlmpRestartingEventManager +impl EventFirer<::Input, S> + for LlmpRestartingEventManager where SP: ShMemProvider, - S: State, + S: HasCorpus + Serialize, + <::Corpus as Corpus>::Input: Serialize, //CE: CustomEvent, { fn should_send(&self) -> bool { @@ -138,8 +152,8 @@ where fn fire( &mut self, - state: &mut Self::State, - event: Event<::Input>, + state: &mut S, + event: Event<::Input>, ) -> Result<(), Error> { // Check if we are going to crash in the event, in which case we store our current state for the next runner self.llmp_mgr.fire(state, event)?; @@ -147,32 +161,18 @@ where Ok(()) } - fn serialize_observers(&mut self, observers: &OT) -> Result>, Error> - where - OT: ObserversTuple + Serialize, - { - self.llmp_mgr.serialize_observers(observers) - } - fn configuration(&self) -> EventConfig { self.llmp_mgr.configuration() } } #[cfg(feature = "std")] -impl EventRestarter for LlmpRestartingEventManager +impl EventRestarter for LlmpRestartingEventManager where - S: State + HasExecutions, + S: HasExecutions, SP: ShMemProvider, //CE: CustomEvent, { - /// The llmp client needs to wait until a broker mapped all pages, before shutting down. - /// Otherwise, the OS may already have removed the shared maps, - #[inline] - fn await_restart_safe(&mut self) { - self.llmp_mgr.await_restart_safe(); - } - /// Reset the single page (we reuse it over and over from pos 0), then send the current state to the next runner. fn on_restart(&mut self, state: &mut S) -> Result<(), Error> { state.on_restart()?; @@ -192,27 +192,37 @@ where self.await_restart_safe(); Ok(()) } +} +#[cfg(feature = "std")] +impl ManagerExit for LlmpRestartingEventManager +where + SP: ShMemProvider, +{ fn send_exiting(&mut self) -> Result<(), Error> { self.staterestorer.send_exiting(); // Also inform the broker that we are about to exit. // This way, the broker can clean up the pages, and eventually exit. self.llmp_mgr.send_exiting() } + + /// The llmp client needs to wait until a broker mapped all pages, before shutting down. + /// Otherwise, the OS may already have removed the shared maps, + #[inline] + fn await_restart_safe(&mut self) { + self.llmp_mgr.await_restart_safe(); + } } #[cfg(feature = "std")] -impl EventProcessor for LlmpRestartingEventManager +impl EventProcessor for LlmpRestartingEventManager where - E: HasObservers + Executor, Z, State = S>, - E::Observers: ObserversTuple + Serialize, - for<'a> E::Observers: Deserialize<'a>, - EMH: EventManagerHooksTuple, - S: State + HasExecutions + HasMetadata + HasImported, - SP: ShMemProvider, - Z: ExecutionProcessor, E::Observers, State = S> - + EvaluatorObservers, E::Observers> - + Evaluator>, + EMH: EventManagerHooksTuple<::Input, S>, + E: HasObservers, + E::Observers: DeserializeOwned, + S: HasCorpus + HasImported + Stoppable + Serialize, + ::Input: DeserializeOwned + Input, + S::Corpus: Serialize, { fn process(&mut self, fuzzer: &mut Z, state: &mut S, executor: &mut E) -> Result { let res = self.llmp_mgr.process(fuzzer, state, executor)?; @@ -225,25 +235,9 @@ where } } -#[cfg(feature = "std")] -impl EventManager for LlmpRestartingEventManager -where - E: HasObservers + Executor, Z, State = S>, - E::Observers: ObserversTuple + Serialize, - for<'a> E::Observers: Deserialize<'a>, - EMH: EventManagerHooksTuple, - S: State + HasExecutions + HasMetadata + HasLastReportTime + HasImported, - SP: ShMemProvider, - Z: ExecutionProcessor, E::Observers, State = S> - + EvaluatorObservers, E::Observers> - + Evaluator>, -{ -} - #[cfg(feature = "std")] impl HasEventManagerId for LlmpRestartingEventManager where - S: State, SP: ShMemProvider, { fn mgr_id(&self) -> EventManagerId { @@ -251,20 +245,6 @@ where } } -#[cfg(feature = "std")] -impl HasCustomBufHandlers for LlmpRestartingEventManager -where - S: State, - SP: ShMemProvider, -{ - fn add_custom_buf_handler( - &mut self, - handler: Box Result>, - ) { - self.llmp_mgr.add_custom_buf_handler(handler); - } -} - /// The llmp connection from the actual fuzzer to the process supervising it const _ENV_FUZZER_SENDER: &str = "_AFL_ENV_FUZZER_SENDER"; const _ENV_FUZZER_RECEIVER: &str = "_AFL_ENV_FUZZER_RECEIVER"; @@ -274,7 +254,7 @@ const _ENV_FUZZER_BROKER_CLIENT_INITIAL: &str = "_AFL_ENV_FUZZER_BROKER_CLIENT"; #[cfg(feature = "std")] impl LlmpRestartingEventManager where - S: State, + S: HasCorpus + Serialize, SP: ShMemProvider, //CE: CustomEvent, { @@ -356,7 +336,8 @@ pub fn setup_restarting_mgr_std( > where MT: Monitor + Clone, - S: State, + S: HasCorpus + Serialize + DeserializeOwned, + ::Input: Input, { RestartingMgr::builder() .shmem_provider(StdShMemProvider::new()?) @@ -389,7 +370,8 @@ pub fn setup_restarting_mgr_std_adaptive( > where MT: Monitor + Clone, - S: State, + S: HasCorpus + Serialize + DeserializeOwned, + ::Input: Input, { RestartingMgr::builder() .shmem_provider(StdShMemProvider::new()?) @@ -454,9 +436,10 @@ pub struct RestartingMgr { #[allow(clippy::type_complexity, clippy::too_many_lines)] impl RestartingMgr where - EMH: EventManagerHooksTuple + Copy + Clone, + EMH: EventManagerHooksTuple<::Input, S> + Copy + Clone, SP: ShMemProvider, - S: State, + S: HasCorpus + Serialize + DeserializeOwned, + ::Input: Input, MT: Monitor + Clone, { /// Launch the broker and the clients and fuzz @@ -489,9 +472,10 @@ where LlmpConnection::on_port(self.shmem_provider.clone(), self.broker_port)?; match connection { LlmpConnection::IsBroker { broker } => { - let llmp_hook = StdLlmpEventHook::::new( - self.monitor.take().unwrap(), - )?; + let llmp_hook = + StdLlmpEventHook::<::Input, MT>::new( + self.monitor.take().unwrap(), + )?; // Yep, broker. Just loop here. log::info!( diff --git a/libafl/src/events/mod.rs b/libafl/src/events/mod.rs index abe1e9494a..36f483bd91 100644 --- a/libafl/src/events/mod.rs +++ b/libafl/src/events/mod.rs @@ -21,7 +21,7 @@ pub use llmp::*; pub mod tcp; pub mod broker_hooks; -use alloc::{borrow::Cow, boxed::Box, string::String, vec::Vec}; +use alloc::{borrow::Cow, string::String, vec::Vec}; use core::{ fmt, hash::{BuildHasher, Hasher}, @@ -101,7 +101,9 @@ impl SignalHandler for ShutdownSignalData { } fn signals(&self) -> Vec { - vec![Signal::SigTerm, Signal::SigInterrupt, Signal::SigQuit] + static SHUTDOWN_SIGNALS: [Signal; 3] = + [Signal::SigTerm, Signal::SigInterrupt, Signal::SigQuit]; + &SHUTDOWN_SIGNALS } } @@ -266,11 +268,7 @@ where // TODO remove forward_id as not anymore needed for centralized /// Events sent around in the library #[derive(Serialize, Deserialize, Clone, Debug)] -#[serde(bound = "I: serde::de::DeserializeOwned")] -pub enum Event -where - I: Input, -{ +pub enum Event { // TODO use an ID to keep track of the original index in the sender Corpus // The sender can then use it to send Testcase metadata with CustomEvent /// A fuzzer found a new testcase. Rejoice! @@ -356,10 +354,7 @@ where },*/ } -impl Event -where - I: Input, -{ +impl Event { /// Event's corresponding name pub fn name(&self) -> &str { match self { @@ -379,7 +374,10 @@ where } /// Event's corresponding name with additional info - fn name_detailed(&self) -> Cow<'static, str> { + fn name_detailed(&self) -> Cow<'static, str> + where + I: Input, + { match self { Event::NewTestcase { input, .. } => { Cow::Owned(format!("Testcase {}", input.generate_name(None))) @@ -405,7 +403,7 @@ where } /// [`EventFirer`] fires an event. -pub trait EventFirer: UsesState { +pub trait EventFirer { /// Send off an [`Event`] to the broker /// /// For multi-processed managers, such as [`LlmpEventManager`], @@ -414,17 +412,13 @@ pub trait EventFirer: UsesState { /// (for example for each [`Input`], on multiple cores) /// the [`llmp`] shared map may fill up and the client will eventually OOM or [`panic`]. /// This should not happen for a normal use-case. - fn fire( - &mut self, - state: &mut Self::State, - event: Event<::Input>, - ) -> Result<(), Error>; + fn fire(&mut self, state: &mut S, event: Event) -> Result<(), Error>; /// Send off an [`Event::Log`] event to the broker. /// This is a shortcut for [`EventFirer::fire`] with [`Event::Log`] as argument. fn log( &mut self, - state: &mut Self::State, + state: &mut S, severity_level: LogSeverity, message: String, ) -> Result<(), Error> { @@ -438,14 +432,6 @@ pub trait EventFirer: UsesState { ) } - /// Serialize all observers for this type and manager - fn serialize_observers(&mut self, observers: &OT) -> Result>, Error> - where - OT: ObserversTuple<::Input, Self::State> + Serialize, - { - Ok(Some(postcard::to_allocvec(observers)?)) - } - /// Get the configuration fn configuration(&self) -> EventConfig { EventConfig::AlwaysUnique @@ -455,109 +441,193 @@ pub trait EventFirer: UsesState { fn should_send(&self) -> bool; } -/// [`ProgressReporter`] report progress to the broker. -pub trait ProgressReporter: EventFirer +/// Serialize all observers for this type and manager +/// Serialize the observer using the `time_factor` and `percentage_threshold`. +/// These parameters are unique to each of the different types of `EventManager` +pub(crate) fn serialize_observers_adaptive( + manager: &mut EM, + observers: &OT, + time_factor: u32, + percentage_threshold: usize, +) -> Result>, Error> where - Self::State: HasMetadata + HasExecutions + HasLastReportTime, + EM: AdaptiveSerializer, + OT: MatchNameRef + Serialize, { - /// Given the last time, if `monitor_timeout` seconds passed, send off an info/monitor/heartbeat message to the broker. - /// Returns the new `last` time (so the old one, unless `monitor_timeout` time has passed and monitor have been sent) - /// Will return an [`Error`], if the stats could not be sent. - fn maybe_report_progress( - &mut self, - state: &mut Self::State, - monitor_timeout: Duration, - ) -> Result<(), Error> { - let Some(last_report_time) = state.last_report_time() else { - // this is the first time we execute, no need to report progress just yet. - *state.last_report_time_mut() = Some(current_time()); - return Ok(()); - }; - let cur = current_time(); - // default to 0 here to avoid crashes on clock skew - if cur.checked_sub(*last_report_time).unwrap_or_default() > monitor_timeout { - // report_progress sets a new `last_report_time` internally. - self.report_progress(state)?; + match manager.time_ref() { + Some(t) => { + let exec_time = observers + .get(t) + .map(|o| o.last_runtime().unwrap_or(Duration::ZERO)) + .unwrap(); + + let mut must_ser = (manager.serialization_time() + manager.deserialization_time()) + * time_factor + < exec_time; + if must_ser { + *manager.should_serialize_cnt_mut() += 1; + } + + if manager.serializations_cnt() > 32 { + must_ser = (manager.should_serialize_cnt() * 100 / manager.serializations_cnt()) + > percentage_threshold; + } + + if manager.serialization_time() == Duration::ZERO + || must_ser + || manager.serializations_cnt().trailing_zeros() >= 8 + { + let start = current_time(); + let ser = postcard::to_allocvec(observers)?; + *manager.serialization_time_mut() = current_time() - start; + + *manager.serializations_cnt_mut() += 1; + Ok(Some(ser)) + } else { + *manager.serializations_cnt_mut() += 1; + Ok(None) + } } - Ok(()) + None => Ok(None), } +} - /// Send off an info/monitor/heartbeat message to the broker. - /// Will return an [`Error`], if the stats could not be sent. - fn report_progress(&mut self, state: &mut Self::State) -> Result<(), Error> { - let executions = *state.executions(); - let cur = current_time(); +/// Default implementation of [`ProgressReporter::maybe_report_progress`] for implementors with the +/// given constraints +pub fn default_maybe_report_progress( + reporter: &mut PR, + state: &mut S, + monitor_timeout: Duration, +) -> Result<(), Error> +where + PR: ProgressReporter, + S: HasMetadata + HasExecutions + HasLastReportTime, +{ + let Some(last_report_time) = state.last_report_time() else { + // this is the first time we execute, no need to report progress just yet. + *state.last_report_time_mut() = Some(current_time()); + return Ok(()); + }; + let cur = current_time(); + // default to 0 here to avoid crashes on clock skew + if cur.checked_sub(*last_report_time).unwrap_or_default() > monitor_timeout { + // report_progress sets a new `last_report_time` internally. + reporter.report_progress(state)?; + } + Ok(()) +} - // Default no introspection implmentation - #[cfg(not(feature = "introspection"))] - self.fire( +/// Default implementation of [`ProgressReporter::report_progress`] for implementors with the +/// given constraints +pub fn default_report_progress(reporter: &mut PR, state: &mut S) -> Result<(), Error> +where + PR: EventFirer, + S: HasExecutions + HasLastReportTime, +{ + let executions = *state.executions(); + let cur = current_time(); + + // Default no introspection implmentation + #[cfg(not(feature = "introspection"))] + reporter.fire( + state, + Event::UpdateExecStats { + executions, + time: cur, + phantom: PhantomData, + }, + )?; + + // If performance monitor are requested, fire the `UpdatePerfMonitor` event + #[cfg(feature = "introspection")] + { + state + .introspection_monitor_mut() + .set_current_time(libafl_bolts::cpu::read_time_counter()); + + // Send the current monitor over to the manager. This `.clone` shouldn't be + // costly as `ClientPerfMonitor` impls `Copy` since it only contains `u64`s + reporter.fire( state, - Event::UpdateExecStats { + Event::UpdatePerfMonitor { executions, time: cur, + introspection_monitor: Box::new(state.introspection_monitor().clone()), phantom: PhantomData, }, )?; + } - // If performance monitor are requested, fire the `UpdatePerfMonitor` event - #[cfg(feature = "introspection")] - { - state - .introspection_monitor_mut() - .set_current_time(libafl_bolts::cpu::read_time_counter()); - - // Send the current monitor over to the manager. This `.clone` shouldn't be - // costly as `ClientPerfMonitor` impls `Copy` since it only contains `u64`s - self.fire( - state, - Event::UpdatePerfMonitor { - executions, - time: cur, - introspection_monitor: Box::new(state.introspection_monitor().clone()), - phantom: PhantomData, - }, - )?; - } + // If we are measuring scalability stuff.. + #[cfg(feature = "scalability_introspection")] + { + let imported_with_observer = state.scalability_monitor().testcase_with_observers; + let imported_without_observer = state.scalability_monitor().testcase_without_observers; - // If we are measuring scalability stuff.. - #[cfg(feature = "scalability_introspection")] - { - let imported_with_observer = state.scalability_monitor().testcase_with_observers; - let imported_without_observer = state.scalability_monitor().testcase_without_observers; - - self.fire( - state, - Event::UpdateUserStats { - name: Cow::from("total imported"), - value: UserStats::new( - UserStatsValue::Number( - (imported_with_observer + imported_without_observer) as u64, - ), - AggregatorOps::Avg, + reporter.fire( + state, + Event::UpdateUserStats { + name: Cow::from("total imported"), + value: UserStats::new( + UserStatsValue::Number( + (imported_with_observer + imported_without_observer) as u64, ), - phantom: PhantomData, - }, - )?; - } + AggregatorOps::Avg, + ), + phantom: PhantomData, + }, + )?; + } - *state.last_report_time_mut() = Some(cur); + *state.last_report_time_mut() = Some(cur); - Ok(()) - } + Ok(()) +} + +/// [`ProgressReporter`] report progress to the broker. +pub trait ProgressReporter { + /// Given the last time, if `monitor_timeout` seconds passed, send off an info/monitor/heartbeat message to the broker. + /// Returns the new `last` time (so the old one, unless `monitor_timeout` time has passed and monitor have been sent) + /// Will return an [`Error`], if the stats could not be sent. + fn maybe_report_progress( + &mut self, + state: &mut S, + monitor_timeout: Duration, + ) -> Result<(), Error>; + + /// Send off an info/monitor/heartbeat message to the broker. + /// Will return an [`Error`], if the stats could not be sent. + fn report_progress(&mut self, state: &mut S) -> Result<(), Error>; } /// Restartable trait -pub trait EventRestarter: UsesState { +pub trait EventRestarter { /// For restarting event managers, implement a way to forward state to their next peers. /// You *must* ensure that [`HasCurrentStageId::on_restart`] will be invoked in this method, by you /// or an internal [`EventRestarter`], before the state is saved for recovery. - #[inline] - fn on_restart(&mut self, state: &mut Self::State) -> Result<(), Error> { - state.on_restart()?; - self.await_restart_safe(); - Ok(()) - } + fn on_restart(&mut self, state: &mut S) -> Result<(), Error>; +} + +/// Default implementation of [`EventRestarter::on_restart`] for implementors with the given +/// constraints +pub fn default_on_restart( + restarter: &mut (impl EventRestarter + ManagerExit), + state: &mut S, +) -> Result<(), Error> +where + S: HasCurrentStageId, +{ + state.on_restart()?; + restarter.await_restart_safe(); + Ok(()) +} + +pub trait CanSerializeObserver { + fn serialize_observers(&mut self, observers: &OT) -> Result>, Error>; +} +/// APIs called before exiting +pub trait ManagerExit { /// Send information that this client is exiting. /// No need to restart us any longer, and no need to print an error, either. fn send_exiting(&mut self) -> Result<(), Error> { @@ -570,19 +640,15 @@ pub trait EventRestarter: UsesState { } /// [`EventProcessor`] process all the incoming messages -pub trait EventProcessor: UsesState { +pub trait EventProcessor { /// Lookup for incoming events and process them. /// Return the number of processes events or an error - fn process( - &mut self, - fuzzer: &mut Z, - state: &mut Self::State, - executor: &mut E, - ) -> Result; + fn process(&mut self, fuzzer: &mut Z, state: &mut S, executor: &mut E) -> Result; /// Shutdown gracefully; typically without saving state. fn on_shutdown(&mut self) -> Result<(), Error>; } + /// The id of this [`EventManager`]. /// For multi processed [`EventManager`]s, /// each connected client should have a unique ids. @@ -592,80 +658,39 @@ pub trait HasEventManagerId { fn mgr_id(&self) -> EventManagerId; } -/// [`EventManager`] is the main communications hub. -/// For the "normal" multi-processed mode, you may want to look into [`LlmpRestartingEventManager`] -pub trait EventManager: - EventFirer + EventProcessor + EventRestarter + HasEventManagerId + ProgressReporter -where - Self::State: HasMetadata + HasExecutions + HasLastReportTime, -{ -} - -/// The handler function for custom buffers exchanged via [`EventManager`] -type CustomBufHandlerFn = dyn FnMut(&mut S, &str, &[u8]) -> Result; - -/// Supports custom buf handlers to handle `CustomBuf` events. -pub trait HasCustomBufHandlers: UsesState { - /// Adds a custom buffer handler that will run for each incoming `CustomBuf` event. - fn add_custom_buf_handler(&mut self, handler: Box>); -} - /// An eventmgr for tests, and as placeholder if you really don't need an event manager. -#[derive(Copy, Clone, Debug)] -pub struct NopEventManager { - phantom: PhantomData, -} +#[derive(Copy, Clone, Debug, Default)] +pub struct NopEventManager {} -impl NopEventManager { - /// Creates a new [`NopEventManager`] - #[must_use] - pub fn new() -> Self { - NopEventManager { - phantom: PhantomData, - } +impl EventFirer for NopEventManager { + fn should_send(&self) -> bool { + true } -} -impl Default for NopEventManager { - fn default() -> Self { - Self::new() + fn fire(&mut self, _state: &mut S, _event: Event) -> Result<(), Error> { + Ok(()) } } -impl UsesState for NopEventManager -where - S: State, -{ - type State = S; -} - -impl EventFirer for NopEventManager +impl EventRestarter for NopEventManager where - S: State, + S: HasCurrentStageId, { - fn should_send(&self) -> bool { - true - } - - fn fire( - &mut self, - _state: &mut Self::State, - _event: Event<::Input>, - ) -> Result<(), Error> { - Ok(()) + fn on_restart(&mut self, state: &mut S) -> Result<(), Error> { + default_on_restart(self, state) } } -impl EventRestarter for NopEventManager where S: State {} +impl ManagerExit for NopEventManager {} -impl EventProcessor for NopEventManager +impl EventProcessor for NopEventManager where S: State + HasExecutions, { fn process( &mut self, _fuzzer: &mut Z, - _state: &mut Self::State, + _state: &mut S, _executor: &mut E, ) -> Result { Ok(0) @@ -676,30 +701,30 @@ where } } -impl EventManager for NopEventManager where - S: State + HasExecutions + HasLastReportTime + HasMetadata +impl CanSerializeObserver for NopEventManager +where + OT: Serialize, { + fn serialize_observers(&mut self, observers: &OT) -> Result>, Error> { + Ok(Some(postcard::to_allocvec(observers)?)) + } } -impl HasCustomBufHandlers for NopEventManager -where - S: State, -{ - fn add_custom_buf_handler( +impl ProgressReporter for NopEventManager { + fn maybe_report_progress( &mut self, - _handler: Box< - dyn FnMut(&mut Self::State, &str, &[u8]) -> Result, - >, - ) { + state: &mut S, + monitor_timeout: Duration, + ) -> Result<(), Error> { + Ok(()) } -} -impl ProgressReporter for NopEventManager where - S: State + HasExecutions + HasLastReportTime + HasMetadata -{ + fn report_progress(&mut self, state: &mut S) -> Result<(), Error> { + Ok(()) + } } -impl HasEventManagerId for NopEventManager { +impl HasEventManagerId for NopEventManager { fn mgr_id(&self) -> EventManagerId { EventManagerId(0) } @@ -708,79 +733,70 @@ impl HasEventManagerId for NopEventManager { /// An [`EventManager`] type that wraps another manager, but captures a `monitor` type as well. /// This is useful to keep the same API between managers with and without an internal `monitor`. #[derive(Copy, Clone, Debug)] -pub struct MonitorTypedEventManager { +pub struct MonitorTypedEventManager { inner: EM, - phantom: PhantomData, } -impl MonitorTypedEventManager { +impl MonitorTypedEventManager { /// Creates a new [`EventManager`] that wraps another manager, but captures a `monitor` type as well. #[must_use] pub fn new(inner: EM) -> Self { - MonitorTypedEventManager { - inner, - phantom: PhantomData, - } + MonitorTypedEventManager { inner } } } -impl UsesState for MonitorTypedEventManager +impl CanSerializeObserver for MonitorTypedEventManager where - EM: UsesState, + OT: Serialize, { - type State = EM::State; + fn serialize_observers(&mut self, observers: &OT) -> Result>, Error> { + Ok(Some(postcard::to_allocvec(observers)?)) + } } -impl EventFirer for MonitorTypedEventManager +impl EventFirer for MonitorTypedEventManager where - EM: EventFirer, + EM: EventFirer, { fn should_send(&self) -> bool { true } #[inline] - fn fire( - &mut self, - state: &mut Self::State, - event: Event<::Input>, - ) -> Result<(), Error> { + fn fire(&mut self, state: &mut S, event: Event) -> Result<(), Error> { self.inner.fire(state, event) } #[inline] fn log( &mut self, - state: &mut Self::State, + state: &mut S, severity_level: LogSeverity, message: String, ) -> Result<(), Error> { self.inner.log(state, severity_level, message) } - #[inline] - fn serialize_observers(&mut self, observers: &OT) -> Result>, Error> - where - OT: ObserversTuple<::Input, Self::State> + Serialize, - { - self.inner.serialize_observers(observers) - } - #[inline] fn configuration(&self) -> EventConfig { self.inner.configuration() } } -impl EventRestarter for MonitorTypedEventManager +impl EventRestarter for MonitorTypedEventManager where - EM: EventRestarter, + EM: EventRestarter, { #[inline] - fn on_restart(&mut self, state: &mut Self::State) -> Result<(), Error> { + fn on_restart(&mut self, state: &mut S) -> Result<(), Error> { self.inner.on_restart(state) } +} +impl ManagerExit for MonitorTypedEventManager +where + EM: ManagerExit, +{ #[inline] fn send_exiting(&mut self) -> Result<(), Error> { self.inner.send_exiting() @@ -792,17 +808,12 @@ where } } -impl EventProcessor for MonitorTypedEventManager +impl EventProcessor for MonitorTypedEventManager where - EM: EventProcessor, + EM: EventProcessor, { #[inline] - fn process( - &mut self, - fuzzer: &mut Z, - state: &mut Self::State, - executor: &mut E, - ) -> Result { + fn process(&mut self, fuzzer: &mut Z, state: &mut S, executor: &mut E) -> Result { self.inner.process(fuzzer, state, executor) } @@ -811,51 +822,26 @@ where } } -impl EventManager for MonitorTypedEventManager -where - EM: EventManager, - Self::State: HasLastReportTime + HasExecutions + HasMetadata, -{ -} - -impl HasCustomBufHandlers for MonitorTypedEventManager -where - Self: UsesState, - EM: HasCustomBufHandlers, -{ - #[inline] - fn add_custom_buf_handler( - &mut self, - handler: Box< - dyn FnMut(&mut Self::State, &str, &[u8]) -> Result, - >, - ) { - self.inner.add_custom_buf_handler(handler); - } -} - -impl ProgressReporter for MonitorTypedEventManager +impl ProgressReporter for MonitorTypedEventManager where - Self: UsesState, - EM: ProgressReporter, - Self::State: HasLastReportTime + HasExecutions + HasMetadata, + EM: ProgressReporter, { #[inline] fn maybe_report_progress( &mut self, - state: &mut Self::State, + state: &mut S, monitor_timeout: Duration, ) -> Result<(), Error> { self.inner.maybe_report_progress(state, monitor_timeout) } #[inline] - fn report_progress(&mut self, state: &mut Self::State) -> Result<(), Error> { + fn report_progress(&mut self, state: &mut S) -> Result<(), Error> { self.inner.report_progress(state) } } -impl HasEventManagerId for MonitorTypedEventManager +impl HasEventManagerId for MonitorTypedEventManager where EM: HasEventManagerId, { @@ -887,56 +873,6 @@ pub trait AdaptiveSerializer { /// A [`Handle`] to the time observer to determine the `time_factor` fn time_ref(&self) -> &Option>; - - /// Serialize the observer using the `time_factor` and `percentage_threshold`. - /// These parameters are unique to each of the different types of `EventManager` - fn serialize_observers_adaptive( - &mut self, - observers: &OT, - time_factor: u32, - percentage_threshold: usize, - ) -> Result>, Error> - where - OT: ObserversTuple + Serialize, - S: UsesInput, - { - match self.time_ref() { - Some(t) => { - let exec_time = observers - .get(t) - .map(|o| o.last_runtime().unwrap_or(Duration::ZERO)) - .unwrap(); - - let mut must_ser = (self.serialization_time() + self.deserialization_time()) - * time_factor - < exec_time; - if must_ser { - *self.should_serialize_cnt_mut() += 1; - } - - if self.serializations_cnt() > 32 { - must_ser = (self.should_serialize_cnt() * 100 / self.serializations_cnt()) - > percentage_threshold; - } - - if self.serialization_time() == Duration::ZERO - || must_ser - || self.serializations_cnt().trailing_zeros() >= 8 - { - let start = current_time(); - let ser = postcard::to_allocvec(observers)?; - *self.serialization_time_mut() = current_time() - start; - - *self.serializations_cnt_mut() += 1; - Ok(Some(ser)) - } else { - *self.serializations_cnt_mut() += 1; - Ok(None) - } - } - None => Ok(None), - } - } } #[cfg(test)] diff --git a/libafl/src/events/simple.rs b/libafl/src/events/simple.rs index b76d78546c..e6805964af 100644 --- a/libafl/src/events/simple.rs +++ b/libafl/src/events/simple.rs @@ -1,9 +1,9 @@ //! A very simple event manager, that just supports log outputs, but no multiprocessing -use alloc::{boxed::Box, vec::Vec}; +use alloc::vec::Vec; +use core::fmt::Debug; #[cfg(all(unix, not(miri), feature = "std"))] use core::ptr::addr_of_mut; -use core::{fmt::Debug, marker::PhantomData}; #[cfg(feature = "std")] use core::{ sync::atomic::{compiler_fence, Ordering}, @@ -22,23 +22,25 @@ use libafl_bolts::{os::CTRL_C_EXIT, shmem::ShMemProvider, staterestore::StateRes #[cfg(feature = "std")] use serde::{de::DeserializeOwned, Serialize}; -use super::{CustomBufEventResult, CustomBufHandlerFn, HasCustomBufHandlers, ProgressReporter}; +use super::{ + default_maybe_report_progress, default_report_progress, CanSerializeObserver, ProgressReporter, +}; #[cfg(all(unix, feature = "std", not(miri)))] use crate::events::EVENTMGR_SIGHANDLER_STATE; use crate::{ + corpus::Corpus, events::{ - BrokerEventResult, Event, EventFirer, EventManager, EventManagerId, EventProcessor, - EventRestarter, HasEventManagerId, + BrokerEventResult, Event, EventFirer, EventManagerId, EventProcessor, EventRestarter, + HasEventManagerId, ManagerExit, }, - inputs::UsesInput, monitors::Monitor, - state::{HasExecutions, HasLastReportTime, State, Stoppable, UsesState}, + state::{HasExecutions, HasLastReportTime, Stoppable}, Error, HasMetadata, }; #[cfg(feature = "std")] use crate::{ monitors::{ClientStats, SimplePrintingMonitor}, - state::{HasCorpus, HasSolutions}, + state::HasCorpus, }; /// The llmp connection from the actual fuzzer to the process supervising it @@ -50,42 +52,32 @@ const _ENV_FUZZER_BROKER_CLIENT_INITIAL: &str = "_AFL_ENV_FUZZER_BROKER_CLIENT"; /// A simple, single-threaded event manager that just logs pub struct SimpleEventManager where - S: UsesInput + Stoppable, + S: HasCorpus, { /// The monitor monitor: MT, /// The events that happened since the last `handle_in_broker` - events: Vec>, - /// The custom buf handler - custom_buf_handlers: Vec>>, - phantom: PhantomData, + events: Vec::Input>>, } impl Debug for SimpleEventManager where + S: HasCorpus, + ::Input: Debug, MT: Debug, - S: UsesInput + Stoppable, { fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { f.debug_struct("SimpleEventManager") - //.field("custom_buf_handlers", self.custom_buf_handlers) .field("monitor", &self.monitor) .field("events", &self.events) .finish_non_exhaustive() } } -impl UsesState for SimpleEventManager -where - S: State, -{ - type State = S; -} - -impl EventFirer for SimpleEventManager +impl EventFirer<::Input, S> for SimpleEventManager where MT: Monitor, - S: State, + S: HasCorpus, { fn should_send(&self) -> bool { true @@ -93,8 +85,8 @@ where fn fire( &mut self, - _state: &mut Self::State, - event: Event<::Input>, + _state: &mut S, + event: Event<::Input>, ) -> Result<(), Error> { match Self::handle_in_broker(&mut self.monitor, &event)? { BrokerEventResult::Forward => self.events.push(event), @@ -104,17 +96,13 @@ where } } -impl EventRestarter for SimpleEventManager -where - MT: Monitor, - S: State, -{ -} +impl ManagerExit for SimpleEventManager where S: HasCorpus {} -impl EventProcessor for SimpleEventManager +impl EventProcessor for SimpleEventManager where + S: HasCorpus + Stoppable, + ::Input: Debug, MT: Monitor, - S: State, { fn process( &mut self, @@ -134,40 +122,37 @@ where } } -impl EventManager for SimpleEventManager +impl CanSerializeObserver for SimpleEventManager where - MT: Monitor, - S: State + HasExecutions + HasLastReportTime + HasMetadata, + S: HasCorpus, + OT: Serialize, { + fn serialize_observers(&mut self, observers: &OT) -> Result>, Error> { + Ok(Some(postcard::to_allocvec(observers)?)) + } } -impl HasCustomBufHandlers for SimpleEventManager +impl ProgressReporter for SimpleEventManager where - MT: Monitor, //CE: CustomEvent, - S: State, + MT: Monitor, + S: HasCorpus + HasExecutions + HasMetadata + HasLastReportTime, { - /// Adds a custom buffer handler that will run for each incoming `CustomBuf` event. - fn add_custom_buf_handler( + fn maybe_report_progress( &mut self, - handler: Box< - dyn FnMut(&mut Self::State, &str, &[u8]) -> Result, - >, - ) { - self.custom_buf_handlers.push(handler); + state: &mut S, + monitor_timeout: Duration, + ) -> Result<(), Error> { + default_maybe_report_progress(self, state, monitor_timeout) } -} -impl ProgressReporter for SimpleEventManager -where - MT: Monitor, - S: State + HasExecutions + HasMetadata + HasLastReportTime, -{ + fn report_progress(&mut self, state: &mut S) -> Result<(), Error> { + default_report_progress(self, state) + } } impl HasEventManagerId for SimpleEventManager where - MT: Monitor, - S: UsesInput + Stoppable, + S: HasCorpus, { fn mgr_id(&self) -> EventManagerId { EventManagerId(0) @@ -177,7 +162,7 @@ where #[cfg(feature = "std")] impl SimpleEventManager where - S: UsesInput + Stoppable, + S: HasCorpus, { /// Creates a [`SimpleEventManager`] that just prints to `stdout`. #[must_use] @@ -188,16 +173,14 @@ where impl SimpleEventManager where - MT: Monitor, //TODO CE: CustomEvent, - S: UsesInput + Stoppable, + S: HasCorpus, + MT: Monitor, { /// Creates a new [`SimpleEventManager`]. pub fn new(monitor: MT) -> Self { Self { monitor, events: vec![], - custom_buf_handlers: vec![], - phantom: PhantomData, } } @@ -205,7 +188,7 @@ where #[allow(clippy::unnecessary_wraps)] fn handle_in_broker( monitor: &mut MT, - event: &Event, + event: &Event<::Input>, ) -> Result { match event { Event::NewTestcase { corpus_size, .. } => { @@ -276,20 +259,22 @@ where // Handle arriving events in the client #[allow(clippy::needless_pass_by_value, clippy::unused_self)] - fn handle_in_client(&mut self, state: &mut S, event: Event) -> Result<(), Error> { + fn handle_in_client( + &mut self, + state: &mut S, + event: Event<::Input>, + ) -> Result<(), Error> + where + S: Stoppable, + ::Input: Debug, + { match event { - Event::CustomBuf { buf, tag } => { - for handler in &mut self.custom_buf_handlers { - handler(state, &tag, &buf)?; - } - Ok(()) - } Event::Stop => { state.request_stop(); Ok(()) } _ => Err(Error::unknown(format!( - "Received illegal message that message should not have arrived: {event:?}." + "Received illegal message that message should not have arrived: {event:?}" ))), } } @@ -302,32 +287,36 @@ where /// `restarter` will start a new process each time the child crashes or times out. #[cfg(feature = "std")] #[allow(clippy::default_trait_access)] -#[derive(Debug)] pub struct SimpleRestartingEventManager where - S: UsesInput + Stoppable, SP: ShMemProvider, //CE: CustomEvent, + S: HasCorpus, { /// The actual simple event mgr - simple_event_mgr: SimpleEventManager, + inner: SimpleEventManager, /// [`StateRestorer`] for restarts staterestorer: StateRestorer, } -#[cfg(feature = "std")] -impl UsesState for SimpleRestartingEventManager +impl Debug for SimpleRestartingEventManager where - S: State, + S: HasCorpus, SP: ShMemProvider, + ::Input: Debug, + MT: Debug, { - type State = S; + fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { + f.debug_struct("SimpleRestartingEventManager") + //.field("handlers", self.handlers) + .field("inner", &self.inner) + .finish_non_exhaustive() + } } #[cfg(feature = "std")] -impl EventFirer for SimpleRestartingEventManager +impl EventFirer for SimpleRestartingEventManager where - MT: Monitor, - S: State, + S: HasCorpus, SP: ShMemProvider, { fn should_send(&self) -> bool { @@ -336,18 +325,18 @@ where fn fire( &mut self, - _state: &mut Self::State, - event: Event<::Input>, + state: &mut S, + event: Event<::Input>, ) -> Result<(), Error> { - self.simple_event_mgr.fire(_state, event) + self.inner.fire(state, event) } } #[cfg(feature = "std")] -impl EventRestarter for SimpleRestartingEventManager +impl EventRestarter for SimpleRestartingEventManager where MT: Monitor, - S: State, + S: HasCorpus, SP: ShMemProvider, { /// Reset the single page (we reuse it over and over from pos 0), then send the current state to the next runner. @@ -358,79 +347,76 @@ where self.staterestorer.reset(); self.staterestorer.save(&( state, - self.simple_event_mgr.monitor.start_time(), - self.simple_event_mgr.monitor.client_stats(), + self.inner.monitor.start_time(), + self.inner.monitor.client_stats(), )) } - - fn send_exiting(&mut self) -> Result<(), Error> { - self.staterestorer.send_exiting(); - Ok(()) - } } -#[cfg(feature = "std")] -impl EventProcessor for SimpleRestartingEventManager +impl CanSerializeObserver for SimpleRestartingEventManager where - MT: Monitor, - S: State + HasExecutions, SP: ShMemProvider, + S: HasCorpus, + OT: Serialize, { - fn process( - &mut self, - fuzzer: &mut Z, - state: &mut Self::State, - executor: &mut E, - ) -> Result { - self.simple_event_mgr.process(fuzzer, state, executor) - } - fn on_shutdown(&mut self) -> Result<(), Error> { - self.send_exiting() + fn serialize_observers(&mut self, observers: &OT) -> Result>, Error> { + Ok(Some(postcard::to_allocvec(observers)?)) } } #[cfg(feature = "std")] -impl EventManager for SimpleRestartingEventManager +impl ManagerExit for SimpleRestartingEventManager where - MT: Monitor, - S: State + HasExecutions + HasMetadata + HasLastReportTime + Serialize, SP: ShMemProvider, + S: HasCorpus, { + fn send_exiting(&mut self) -> Result<(), Error> { + self.staterestorer.send_exiting(); + Ok(()) + } } #[cfg(feature = "std")] -impl HasCustomBufHandlers for SimpleRestartingEventManager +impl EventProcessor for SimpleRestartingEventManager where - MT: Monitor, - S: State, + S: HasCorpus, SP: ShMemProvider, { - fn add_custom_buf_handler( - &mut self, - handler: Box Result>, - ) { - self.simple_event_mgr.add_custom_buf_handler(handler); + fn process(&mut self, fuzzer: &mut Z, state: &mut S, executor: &mut E) -> Result { + self.simple_event_mgr.process(fuzzer, state, executor) + } + fn on_shutdown(&mut self) -> Result<(), Error> { + self.send_exiting() } } #[cfg(feature = "std")] -impl ProgressReporter for SimpleRestartingEventManager +impl ProgressReporter for SimpleRestartingEventManager where - MT: Monitor, - S: State + HasExecutions + HasMetadata + HasLastReportTime, + S: HasCorpus, SP: ShMemProvider, { + fn maybe_report_progress( + &mut self, + state: &mut S, + monitor_timeout: Duration, + ) -> Result<(), Error> { + default_maybe_report_progress(self, state, monitor_timeout) + } + + fn report_progress(&mut self, state: &mut S) -> Result<(), Error> { + default_report_progress(self, state) + } } #[cfg(feature = "std")] impl HasEventManagerId for SimpleRestartingEventManager where - MT: Monitor, - S: UsesInput + Stoppable, + S: HasCorpus, SP: ShMemProvider, { fn mgr_id(&self) -> EventManagerId { - self.simple_event_mgr.mgr_id() + self.inner.mgr_id() } } @@ -438,15 +424,14 @@ where #[allow(clippy::type_complexity, clippy::too_many_lines)] impl SimpleRestartingEventManager where - S: UsesInput + Stoppable, + S: HasCorpus, SP: ShMemProvider, - MT: Monitor, //TODO CE: CustomEvent, { /// Creates a new [`SimpleEventManager`]. fn launched(monitor: MT, staterestorer: StateRestorer) -> Self { Self { staterestorer, - simple_event_mgr: SimpleEventManager::new(monitor), + inner: SimpleEventManager::new(monitor), } } @@ -456,8 +441,8 @@ where #[allow(clippy::similar_names)] pub fn launch(mut monitor: MT, shmem_provider: &mut SP) -> Result<(Option, Self), Error> where - S: DeserializeOwned + Serialize + HasCorpus + HasSolutions, - MT: Debug, + S: DeserializeOwned, + MT: Monitor, { // We start ourself as child process to actually fuzz let mut staterestorer = if std::env::var(_ENV_FUZZER_SENDER).is_err() {