Skip to content

Commit

Permalink
fix event processing bug leaving always one event in queue
Browse files Browse the repository at this point in the history
Signed-off-by: Quentin JEROME <[email protected]>
  • Loading branch information
qjerome committed Oct 23, 2023
1 parent 76d5ff6 commit 9aedaba
Showing 1 changed file with 30 additions and 8 deletions.
38 changes: 30 additions & 8 deletions kunai/src/bin/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -718,6 +718,7 @@ impl EventProcessor {
#[inline(always)]
fn output_json(&mut self, j: JsonValue) {
writeln!(self.output, "{j}").expect("failed to write json event");
std::io::stdout().flush().expect("failed to flush output");
}

fn handle_event(&mut self, enc_event: &mut EncodedEvent) {
Expand Down Expand Up @@ -745,6 +746,9 @@ impl EventProcessor {
// this event is used for correlation but cannot be processed
// asynchronously so we have to handle correlation here
self.handle_correlation_event(std_info.clone(), &CorrelationEvent::from(e));
// we have to rebuild std_info as it has it is uses correlation
// information
let std_info = self.build_std_event_info(*i);
let e = self.json_execve(std_info, e);
self.output_json(e);
}
Expand All @@ -757,6 +761,9 @@ impl EventProcessor {
// this event is used for correlation but cannot be processed
// asynchronously so we have to handle correlation here
self.handle_correlation_event(std_info.clone(), &CorrelationEvent::from(e));
// we have to rebuild std_info as it has it is uses correlation
// information
let std_info = self.build_std_event_info(*i);
let e = self.json_clone(std_info, e);
self.output_json(e);
}
Expand Down Expand Up @@ -935,7 +942,7 @@ impl EventReader {
}

// Event ordering is a very important piece as it impacts on-host correlations.
// Additionaly it is very useful as it guarantees events are printed/piped in to
// Additionaly it is very useful as it guarantees events are printed/piped into
// other tools in the damn good order.
//
// Ordering correctness relies on two factors
Expand Down Expand Up @@ -967,7 +974,7 @@ impl EventReader {
// we find the last event corresponding to previous batch
// if we cannot find one it means all events are of the current batch
// so we should not process any event (satisfies condition 2)
let mut count = self
let index_first = self
.pipe
.iter()
.enumerate()
Expand All @@ -981,9 +988,12 @@ impl EventReader {
.map(|(i, _)| i)
.unwrap_or_default();

// converts index into a counter
let mut counter = index_first + 1;

// processing count piped events, we need to pop front as events
// are sorted ascending by timestamp
while count > 0 {
while counter > 0 {
// at this point pop_front cannot fail as count takes account of the elements in the pipe
let enc_evt = self
.pipe
Expand All @@ -993,7 +1003,7 @@ impl EventReader {
// send event to event processor
self.sender.send(enc_evt).unwrap();

count -= 1;
counter -= 1;
}
}

Expand Down Expand Up @@ -1048,6 +1058,13 @@ impl EventReader {
let i = unsafe { e.info() }.unwrap();

match i.etype {
Type::Execve | Type::ExecveScript => {
let event = event!(e, ExecveEvent).unwrap();
for e in HashEvent::all_from_execve(event) {
self.send_event(e).unwrap()
}
}

Type::MmapExec => {
let event = event!(e, MmapExecEvent).unwrap();
self.send_event(HashEvent::from(event)).unwrap();
Expand Down Expand Up @@ -1091,9 +1108,6 @@ impl EventReader {

// process each perf buffer in a separate task
task::spawn(async move {
// only the reducer thread will be allowed to switch between namespaces
if cpu_id == reducer_cpu_id {}

// the number of buffers we want to use gives us the number of events we can read
// in one go in userland
let mut buffers = (0..conf.max_buffered_events)
Expand Down Expand Up @@ -1175,7 +1189,7 @@ impl EventReader {
.expect("info should not fail here")
.etype;

//filtering out unwanted events
// filtering out unwanted events
if !er.filter.is_enabled(etype) {
continue;
}
Expand Down Expand Up @@ -1245,6 +1259,9 @@ struct Cli {

#[arg(short, long, action = clap::ArgAction::Count, help="Set verbosity level, repeat option for more verbosity.")]
verbose: u8,

#[arg(short, long)]
silent: bool,
}

// todo: make single-threaded / multi-threaded available in configuration
Expand All @@ -1265,6 +1282,11 @@ async fn main() -> Result<(), anyhow::Error> {
_ => {}
}

// silent out logging if specified in CLI
if cli.silent {
log_level = LevelFilter::Off;
}

let mut verifier_level = match std::env::var("VERIFIER_LOG_LEVEL") {
Ok(s) => match s.as_str() {
"debug" => VerifierLogLevel::DEBUG,
Expand Down

0 comments on commit 9aedaba

Please sign in to comment.