From 0ab893129c2e34f9adff746e0c709dad82ae0157 Mon Sep 17 00:00:00 2001 From: qjerome Date: Mon, 25 Nov 2024 15:06:26 +0100 Subject: [PATCH] feat: support for unbuffered output --- kunai/src/bin/main.rs | 46 +++++++++++++++++++++++++------------------ kunai/src/config.rs | 37 +++++++++++++++++++--------------- 2 files changed, 48 insertions(+), 35 deletions(-) diff --git a/kunai/src/bin/main.rs b/kunai/src/bin/main.rs index 045d88b..de16382 100644 --- a/kunai/src/bin/main.rs +++ b/kunai/src/bin/main.rs @@ -68,7 +68,7 @@ use tokio::{task, time}; use kunai::cache::*; -use kunai::config::{Config, FileSettings}; +use kunai::config::{self, Config}; use kunai::util::namespaces::{unshare, Namespace}; use kunai::util::*; @@ -261,7 +261,7 @@ struct EventConsumer<'s> { impl<'s> EventConsumer<'s> { fn prepare_output(config: &Config) -> anyhow::Result { - let output = match &config.output.as_str() { + let output = match &config.output.path.as_str() { &"stdout" => String::from("/dev/stdout"), &"stderr" => String::from("/dev/stderr"), v => v.to_string(), @@ -282,19 +282,20 @@ impl<'s> EventConsumer<'s> { } } - match config.output_settings.as_ref() { - Some(s) => firo::OpenOptions::new() - .mode(0o600) - .max_size(s.max_size) - .trigger(s.rotate_size.into()) - .compression(firo::Compression::Gzip) - .create_append(v)? - .into(), - None => firo::OpenOptions::new() - .mode(0o600) - .create_append(v)? - .into(), + let mut opts = firo::OpenOptions::new(); + + opts.mode(0o600); + + if let Some(max_size) = config.output.max_size { + opts.max_size(max_size); + } + + if let Some(rotate_size) = config.output.rotate_size { + opts.trigger(rotate_size.into()); + opts.compression(firo::Compression::Gzip); } + + opts.create_append(v)?.into() } }; Ok(out) @@ -1709,6 +1710,12 @@ impl<'s> EventConsumer<'s> { match serde_json::to_string(event) { Ok(ser) => { writeln!(self.output, "{ser}").expect("failed to write json event"); + // if output is unbuffered we flush it + // unbuffered output allow to have logs written in near + // real-time into output file + if !self.config.output.buffered { + self.output.flush().expect("failed to flush output"); + } return true; } Err(e) => error!("failed to serialize event to json: {e}"), @@ -3088,10 +3095,11 @@ WantedBy=sysinit.target"#, let conf = Config::default() .harden(co.harden) .generate_host_uuid() - .output(log_path) - .output_settings(FileSettings { - rotate_size: huby::ByteSize::from_mb(10), - max_size: huby::ByteSize::from_gb(1), + .output(config::Output { + path: log_path.to_string_lossy().to_string(), + rotate_size: Some(huby::ByteSize::from_mb(10)), + max_size: Some(huby::ByteSize::from_gb(1)), + buffered: false, }); println!( "Writing configuration file: {}", @@ -3131,7 +3139,7 @@ WantedBy=sysinit.target"#, ) .map_err(|e| anyhow!("failed to parse config file: {e}"))?; - PathBuf::from(config.output) + PathBuf::from(config.output.path) } else { // cannot panic as it is Some o.log_file.unwrap() diff --git a/kunai/src/config.rs b/kunai/src/config.rs index e6b1b75..deee2b9 100644 --- a/kunai/src/config.rs +++ b/kunai/src/config.rs @@ -4,7 +4,7 @@ use kunai_common::{ config::{BpfConfig, Filter, Loader}, }; use serde::{Deserialize, Serialize}; -use std::{collections::BTreeMap, fs, path::Path}; +use std::{collections::BTreeMap, fs}; use thiserror::Error; pub const DEFAULT_SEND_DATA_MIN_LEN: u64 = 256; @@ -41,9 +41,11 @@ impl Event { } #[derive(Debug, Clone, Deserialize, Serialize)] -pub struct FileSettings { - pub rotate_size: ByteSize, - pub max_size: ByteSize, +pub struct Output { + pub path: String, + pub rotate_size: Option, + pub max_size: Option, + pub buffered: bool, } #[derive(Debug, Clone, Deserialize, Serialize)] @@ -59,12 +61,11 @@ pub struct Scanner { #[derive(Debug, Clone, Deserialize, Serialize)] pub struct Config { host_uuid: Option, - pub output: String, - pub output_settings: Option, pub max_buffered_events: u16, pub workers: Option, pub send_data_min_len: Option, pub harden: bool, + pub output: Output, pub scanner: Scanner, pub events: BTreeMap, } @@ -86,8 +87,12 @@ impl Default for Config { Self { host_uuid: None, - output: "/dev/stdout".into(), - output_settings: None, + output: Output { + path: "/dev/stdout".into(), + max_size: None, + rotate_size: None, + buffered: false, + }, max_buffered_events: DEFAULT_MAX_BUFFERED_EVENTS, workers: None, send_data_min_len: None, @@ -137,18 +142,18 @@ impl Config { self } - pub fn output>(mut self, p: P) -> Self { - self.output = p.as_ref().to_string_lossy().to_string(); - self - } - - pub fn output_settings(mut self, s: FileSettings) -> Self { - self.output_settings = Some(s); + pub fn output(mut self, o: Output) -> Self { + self.output = o; self } pub fn stdout_output(mut self) -> Self { - self.output = "stdout".into(); + self.output = Output { + path: "stdout".into(), + max_size: None, + rotate_size: None, + buffered: false, + }; self }