Skip to content

Commit

Permalink
feat: support for unbuffered output
Browse files Browse the repository at this point in the history
  • Loading branch information
qjerome committed Nov 25, 2024
1 parent 9b9c24a commit 0ab8931
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 35 deletions.
46 changes: 27 additions & 19 deletions kunai/src/bin/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ use tokio::{task, time};

use kunai::cache::*;

use kunai::config::{Config, FileSettings};
use kunai::config::{self, Config};
use kunai::util::namespaces::{unshare, Namespace};
use kunai::util::*;

Expand Down Expand Up @@ -261,7 +261,7 @@ struct EventConsumer<'s> {

impl<'s> EventConsumer<'s> {
fn prepare_output(config: &Config) -> anyhow::Result<Output> {
let output = match &config.output.as_str() {
let output = match &config.output.path.as_str() {
&"stdout" => String::from("/dev/stdout"),
&"stderr" => String::from("/dev/stderr"),
v => v.to_string(),
Expand All @@ -282,19 +282,20 @@ impl<'s> EventConsumer<'s> {
}
}

match config.output_settings.as_ref() {
Some(s) => firo::OpenOptions::new()
.mode(0o600)
.max_size(s.max_size)
.trigger(s.rotate_size.into())
.compression(firo::Compression::Gzip)
.create_append(v)?
.into(),
None => firo::OpenOptions::new()
.mode(0o600)
.create_append(v)?
.into(),
let mut opts = firo::OpenOptions::new();

opts.mode(0o600);

if let Some(max_size) = config.output.max_size {
opts.max_size(max_size);
}

if let Some(rotate_size) = config.output.rotate_size {
opts.trigger(rotate_size.into());
opts.compression(firo::Compression::Gzip);
}

opts.create_append(v)?.into()
}
};
Ok(out)
Expand Down Expand Up @@ -1709,6 +1710,12 @@ impl<'s> EventConsumer<'s> {
match serde_json::to_string(event) {
Ok(ser) => {
writeln!(self.output, "{ser}").expect("failed to write json event");
// if output is unbuffered we flush it
// unbuffered output allow to have logs written in near
// real-time into output file
if !self.config.output.buffered {
self.output.flush().expect("failed to flush output");
}
return true;
}
Err(e) => error!("failed to serialize event to json: {e}"),
Expand Down Expand Up @@ -3088,10 +3095,11 @@ WantedBy=sysinit.target"#,
let conf = Config::default()
.harden(co.harden)
.generate_host_uuid()
.output(log_path)
.output_settings(FileSettings {
rotate_size: huby::ByteSize::from_mb(10),
max_size: huby::ByteSize::from_gb(1),
.output(config::Output {
path: log_path.to_string_lossy().to_string(),
rotate_size: Some(huby::ByteSize::from_mb(10)),
max_size: Some(huby::ByteSize::from_gb(1)),
buffered: false,
});
println!(
"Writing configuration file: {}",
Expand Down Expand Up @@ -3131,7 +3139,7 @@ WantedBy=sysinit.target"#,
)
.map_err(|e| anyhow!("failed to parse config file: {e}"))?;

PathBuf::from(config.output)
PathBuf::from(config.output.path)
} else {
// cannot panic as it is Some
o.log_file.unwrap()
Expand Down
37 changes: 21 additions & 16 deletions kunai/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use kunai_common::{
config::{BpfConfig, Filter, Loader},
};
use serde::{Deserialize, Serialize};
use std::{collections::BTreeMap, fs, path::Path};
use std::{collections::BTreeMap, fs};
use thiserror::Error;

pub const DEFAULT_SEND_DATA_MIN_LEN: u64 = 256;
Expand Down Expand Up @@ -41,9 +41,11 @@ impl Event {
}

#[derive(Debug, Clone, Deserialize, Serialize)]
pub struct FileSettings {
pub rotate_size: ByteSize,
pub max_size: ByteSize,
pub struct Output {
pub path: String,
pub rotate_size: Option<ByteSize>,
pub max_size: Option<ByteSize>,
pub buffered: bool,
}

#[derive(Debug, Clone, Deserialize, Serialize)]
Expand All @@ -59,12 +61,11 @@ pub struct Scanner {
#[derive(Debug, Clone, Deserialize, Serialize)]
pub struct Config {
host_uuid: Option<uuid::Uuid>,
pub output: String,
pub output_settings: Option<FileSettings>,
pub max_buffered_events: u16,
pub workers: Option<usize>,
pub send_data_min_len: Option<u64>,
pub harden: bool,
pub output: Output,
pub scanner: Scanner,
pub events: BTreeMap<bpf_events::Type, Event>,
}
Expand All @@ -86,8 +87,12 @@ impl Default for Config {

Self {
host_uuid: None,
output: "/dev/stdout".into(),
output_settings: None,
output: Output {
path: "/dev/stdout".into(),
max_size: None,
rotate_size: None,
buffered: false,
},
max_buffered_events: DEFAULT_MAX_BUFFERED_EVENTS,
workers: None,
send_data_min_len: None,
Expand Down Expand Up @@ -137,18 +142,18 @@ impl Config {
self
}

pub fn output<P: AsRef<Path>>(mut self, p: P) -> Self {
self.output = p.as_ref().to_string_lossy().to_string();
self
}

pub fn output_settings(mut self, s: FileSettings) -> Self {
self.output_settings = Some(s);
pub fn output(mut self, o: Output) -> Self {
self.output = o;
self
}

pub fn stdout_output(mut self) -> Self {
self.output = "stdout".into();
self.output = Output {
path: "stdout".into(),
max_size: None,
rotate_size: None,
buffered: false,
};
self
}

Expand Down

0 comments on commit 0ab8931

Please sign in to comment.