diff --git a/Cargo.lock b/Cargo.lock index e9d1af1..3d73ae8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -108,17 +108,6 @@ version = "1.0.75" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a4668cab20f66d8d020e1fbc0ebe47217433c1b6c8f2040faf858554e394ace6" -[[package]] -name = "async-trait" -version = "0.1.73" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bc00ceb34980c03614e35a3a4e218276a0a824e911d07651cd0d858a51e8c0f0" -dependencies = [ - "proc-macro2", - "quote", - "syn 2.0.32", -] - [[package]] name = "autocfg" version = "1.1.0" @@ -378,25 +367,6 @@ dependencies = [ "libc", ] -[[package]] -name = "crossbeam-channel" -version = "0.5.8" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a33c2bf77f2df06183c3aa30d1e96c0695a313d4f9c453cc3762a6db39f99200" -dependencies = [ - "cfg-if", - "crossbeam-utils", -] - -[[package]] -name = "crossbeam-utils" -version = "0.8.16" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5a22b2d63d4d1dc0b7f1b6b2747dd0088008a9be28b6ddf0b1e7d335e3037294" -dependencies = [ - "cfg-if", -] - [[package]] name = "crypto-auditing" version = "0.1.0" @@ -409,11 +379,11 @@ dependencies = [ "serde", "serde_cbor", "serde_with", - "tarpc", "thiserror", "tokio", "tokio-serde", "tokio-stream", + "tokio-util", "tracing", ] @@ -473,10 +443,11 @@ dependencies = [ "inotify", "libsystemd", "serde_cbor", - "tarpc", + "tempfile", "tokio", "tokio-serde", "tokio-stream", + "tokio-util", "toml", "tracing", "tracing-subscriber", @@ -745,17 +716,6 @@ dependencies = [ "version_check", ] -[[package]] -name = "getrandom" -version = "0.2.10" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "be4136b2a15dd319360be1c07d9933517ccf0be8f16bf62a3bee4f0d618df427" -dependencies = [ - "cfg-if", - "libc", - "wasi", -] - [[package]] name = "gimli" version = "0.28.0" @@ -822,12 +782,6 @@ dependencies = [ "windows-sys", ] -[[package]] -name = "humantime" -version = "2.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9a3a5bfb195931eeb336b2a7b4d761daec841b97f947d34394601737a7bba5e4" - [[package]] name = "iana-time-zone" version = "0.1.57" @@ -1258,49 +1212,6 @@ dependencies = [ "vcpkg", ] -[[package]] -name = "opentelemetry" -version = "0.18.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "69d6c3d7288a106c0a363e4b0e8d308058d56902adefb16f4936f417ffef086e" -dependencies = [ - "opentelemetry_api", - "opentelemetry_sdk", -] - -[[package]] -name = "opentelemetry_api" -version = "0.18.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c24f96e21e7acc813c7a8394ee94978929db2bcc46cf6b5014fc612bf7760c22" -dependencies = [ - "futures-channel", - "futures-util", - "indexmap 1.9.3", - "js-sys", - "once_cell", - "pin-project-lite", - "thiserror", -] - -[[package]] -name = "opentelemetry_sdk" -version = "0.18.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1ca41c4933371b61c2a2f214bf16931499af4ec90543604ec828f7a625c09113" -dependencies = [ - "async-trait", - "crossbeam-channel", - "futures-channel", - "futures-executor", - "futures-util", - "once_cell", - "opentelemetry_api", - "percent-encoding", - "rand", - "thiserror", -] - [[package]] name = "os_str_bytes" version = "6.5.1" @@ -1329,12 +1240,6 @@ version = "0.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "19b17cddbe7ec3f8bc800887bab5e717348c95ea2ca0b1bf0837fb964dc67099" -[[package]] -name = "percent-encoding" -version = "2.3.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9b2a4787296e9989611394c33f193f676704af1686e70b8f8033ab5ba9a35a94" - [[package]] name = "pin-project" version = "1.1.3" @@ -1379,12 +1284,6 @@ version = "0.2.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b4596b6d070b27117e987119b4dac604f3c58cfb0b191112e24771b2faeac1a6" -[[package]] -name = "ppv-lite86" -version = "0.2.17" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5b40af805b3121feab8a3c29f04d8ad262fa8e0561883e7653e024ae4479e6de" - [[package]] name = "probe" version = "0.3.0" @@ -1443,36 +1342,6 @@ dependencies = [ "proc-macro2", ] -[[package]] -name = "rand" -version = "0.8.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "34af8d1a0e25924bc5b7c43c079c942339d8f0a8b57c39049bef581b46327404" -dependencies = [ - "libc", - "rand_chacha", - "rand_core", -] - -[[package]] -name = "rand_chacha" -version = "0.3.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e6c10a63a0fa32252be49d21e7709d4d4baf8d231c2dbce1eaa8141b9b127d88" -dependencies = [ - "ppv-lite86", - "rand_core", -] - -[[package]] -name = "rand_core" -version = "0.6.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ec0be4795e2f6a28069bec0b5ff3e2ac9bafc99e6a9a7dc3547996c5c816922c" -dependencies = [ - "getrandom", -] - [[package]] name = "redox_syscall" version = "0.3.5" @@ -1743,12 +1612,6 @@ dependencies = [ "windows-sys", ] -[[package]] -name = "static_assertions" -version = "1.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a2eb9349b6444b326872e140eb1cf5e7c522154d69e7a0ffb0fb81c06b37543f" - [[package]] name = "strsim" version = "0.10.0" @@ -1796,41 +1659,6 @@ dependencies = [ "unicode-ident", ] -[[package]] -name = "tarpc" -version = "0.33.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6f41bce44d290df0598ae4b9cd6ea7f58f651fd3aa4af1b26060c4fa32b08af7" -dependencies = [ - "anyhow", - "fnv", - "futures", - "humantime", - "opentelemetry", - "pin-project", - "rand", - "serde", - "static_assertions", - "tarpc-plugins", - "thiserror", - "tokio", - "tokio-serde", - "tokio-util", - "tracing", - "tracing-opentelemetry", -] - -[[package]] -name = "tarpc-plugins" -version = "0.12.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0ee42b4e559f17bce0385ebf511a7beb67d5cc33c12c96b7f4e9789919d9c10f" -dependencies = [ - "proc-macro2", - "quote", - "syn 1.0.109", -] - [[package]] name = "tempfile" version = "3.8.0" @@ -1989,7 +1817,6 @@ dependencies = [ "futures-core", "futures-sink", "pin-project-lite", - "slab", "tokio", "tracing", ] @@ -2052,7 +1879,6 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8ce8c33a8d48bd45d624a6e523445fd21ec13d3653cd51f681abf67418f54eb8" dependencies = [ "cfg-if", - "log", "pin-project-lite", "tracing-attributes", "tracing-core", @@ -2090,19 +1916,6 @@ dependencies = [ "tracing-core", ] -[[package]] -name = "tracing-opentelemetry" -version = "0.18.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "21ebb87a95ea13271332df069020513ab70bdb5637ca42d6e492dc3bbbad48de" -dependencies = [ - "once_cell", - "opentelemetry", - "tracing", - "tracing-core", - "tracing-subscriber", -] - [[package]] name = "tracing-subscriber" version = "0.3.17" diff --git a/README.md b/README.md index 1171b9c..f13ad32 100644 --- a/README.md +++ b/README.md @@ -213,4 +213,3 @@ You can open the generated `flamegraph.html` with your browser. - [libbpf-async](https://github.com/fujita/libbpf-async) for asynchronous BPF ringbuf implementation over libbpf-rs - [rust-keylime](https://github.com/keylime/rust-keylime/) for permissions management code -- [tarpc](https://github.com/google/tarpc) for the pubsub example implementation diff --git a/agent/tests/agenttest/src/lib.rs b/agent/tests/agenttest/src/lib.rs index e2223a7..86995a2 100644 --- a/agent/tests/agenttest/src/lib.rs +++ b/agent/tests/agenttest/src/lib.rs @@ -3,8 +3,7 @@ use anyhow::{bail, Result}; use libbpf_rs::{Link, Map, Object, RingBufferBuilder}; -use std::env; -use std::path::{Path, PathBuf}; +use std::path::Path; use std::process::Child; use std::time::Duration; @@ -13,23 +12,6 @@ mod skel { } use skel::*; -pub fn target_dir() -> PathBuf { - env::current_exe() - .ok() - .map(|mut path| { - path.pop(); - if path.ends_with("deps") { - path.pop(); - } - path - }) - .unwrap() -} - -pub fn agent_path() -> PathBuf { - target_dir().join("crypto-auditing-agent") -} - pub fn bump_memlock_rlimit() -> Result<()> { let rlimit = libc::rlimit { rlim_cur: 128 << 20, diff --git a/agent/tests/coalesce.rs b/agent/tests/coalesce.rs index 7637fd2..dec9240 100644 --- a/agent/tests/coalesce.rs +++ b/agent/tests/coalesce.rs @@ -8,27 +8,50 @@ use crypto_auditing::types::EventGroup; use probe::probe; use serde_cbor::de::Deserializer; use std::env; -use std::panic; use std::path::PathBuf; -use std::process::Command; +use std::process::{Child, Command}; use std::thread; use std::time::Duration; use tempfile::tempdir; +fn target_dir() -> PathBuf { + env::current_exe() + .ok() + .map(|mut path| { + path.pop(); + if path.ends_with("deps") { + path.pop(); + } + path + }) + .unwrap() +} + fn fixture_dir() -> PathBuf { - PathBuf::from(env!("CARGO_MANIFEST_DIR")).join("fixtures") + PathBuf::from(env!("CARGO_MANIFEST_DIR")) + .parent() + .unwrap() + .join("fixtures") +} + +struct AgentProcess(Child); + +impl Drop for AgentProcess { + fn drop(&mut self) { + self.0.kill().expect("unable to kill event-broker"); + } } #[test] fn test_probe_coalesce() { bump_memlock_rlimit().expect("unable to bump memlock rlimit"); - let agent_path = agent_path(); + let agent_path = target_dir().join("crypto-auditing-agent"); let log_dir = tempdir().expect("unable to create temporary directory"); let log_path = log_dir.path().join("agent.log"); - let mut process = Command::new(&agent_path) + let process = Command::new(&agent_path) .arg("-c") - .arg(fixture_dir().join("agent.conf")) + .arg(fixture_dir().join("conf").join("agent.conf")) .arg("--log-file") .arg(&log_path) .arg("--library") @@ -38,6 +61,9 @@ fn test_probe_coalesce() { .spawn() .expect("unable to spawn agent"); + // Make sure the agent process will be killed at exit + let process = AgentProcess(process); + // Wait until the agent process starts up for _ in 0..5 { if log_path.exists() { @@ -47,59 +73,53 @@ fn test_probe_coalesce() { } assert!(log_path.exists()); - let result = panic::catch_unwind(|| { - let foo = String::from("foo\0"); - let bar = String::from("bar\0"); - let baz = String::from("baz\0"); - - let (_link, object) = - attach_bpf(&process, &agent_path).expect("unable to attach agent.bpf.o"); - let map = object.map("ringbuf").expect("unable to get ringbuf map"); - - let timeout = Duration::from_secs(10); - - let result = with_ringbuffer( - map, - || { - probe!(crypto_auditing, new_context, 1, 2); - probe!(crypto_auditing, word_data, 1, foo.as_ptr(), 3); - probe!(crypto_auditing, string_data, 1, bar.as_ptr(), bar.as_ptr()); - probe!( - crypto_auditing, - blob_data, - 1, - baz.as_ptr(), - baz.as_ptr(), - baz.len() - ); - }, - timeout, - ) - .expect("unable to exercise probe points"); - assert_eq!(result, 4); - let result = with_ringbuffer( - map, - || { - probe!(crypto_auditing, new_context, 4, 5); - }, - timeout, - ) - .expect("unable to exercise probe points"); - assert_eq!(result, 1); - - let log_file = std::fs::File::open(&log_path) - .expect(&format!("unable to read file `{}`", log_path.display())); - - let groups: Result, _> = Deserializer::from_reader(&log_file) - .into_iter::() - .collect(); - let groups = groups.expect("error deserializing"); - assert_eq!(groups.len(), 2); - assert_eq!(groups[0].events().len(), 4); - assert_eq!(groups[1].events().len(), 1); - }); - - process.kill().expect("unable to kill agent"); - - assert!(result.is_ok()); + let foo = String::from("foo\0"); + let bar = String::from("bar\0"); + let baz = String::from("baz\0"); + + let (_link, object) = + attach_bpf(&process.0, &agent_path).expect("unable to attach agent.bpf.o"); + let map = object.map("ringbuf").expect("unable to get ringbuf map"); + + let timeout = Duration::from_secs(10); + + let result = with_ringbuffer( + map, + || { + probe!(crypto_auditing, new_context, 1, 2); + probe!(crypto_auditing, word_data, 1, foo.as_ptr(), 3); + probe!(crypto_auditing, string_data, 1, bar.as_ptr(), bar.as_ptr()); + probe!( + crypto_auditing, + blob_data, + 1, + baz.as_ptr(), + baz.as_ptr(), + baz.len() + ); + }, + timeout, + ) + .expect("unable to exercise probe points"); + assert_eq!(result, 4); + let result = with_ringbuffer( + map, + || { + probe!(crypto_auditing, new_context, 4, 5); + }, + timeout, + ) + .expect("unable to exercise probe points"); + assert_eq!(result, 1); + + let log_file = std::fs::File::open(&log_path) + .expect(&format!("unable to read file `{}`", log_path.display())); + + let groups: Result, _> = Deserializer::from_reader(&log_file) + .into_iter::() + .collect(); + let groups = groups.expect("error deserializing"); + assert_eq!(groups.len(), 2); + assert_eq!(groups[0].events().len(), 4); + assert_eq!(groups[1].events().len(), 1); } diff --git a/agent/tests/no_coalesce.rs b/agent/tests/no_coalesce.rs index 7f8f62e..515d730 100644 --- a/agent/tests/no_coalesce.rs +++ b/agent/tests/no_coalesce.rs @@ -8,27 +8,50 @@ use crypto_auditing::types::EventGroup; use probe::probe; use serde_cbor::de::Deserializer; use std::env; -use std::panic; use std::path::PathBuf; -use std::process::Command; +use std::process::{Child, Command}; use std::thread; use std::time::Duration; use tempfile::tempdir; +fn target_dir() -> PathBuf { + env::current_exe() + .ok() + .map(|mut path| { + path.pop(); + if path.ends_with("deps") { + path.pop(); + } + path + }) + .unwrap() +} + fn fixture_dir() -> PathBuf { - PathBuf::from(env!("CARGO_MANIFEST_DIR")).join("fixtures") + PathBuf::from(env!("CARGO_MANIFEST_DIR")) + .parent() + .unwrap() + .join("fixtures") +} + +struct AgentProcess(Child); + +impl Drop for AgentProcess { + fn drop(&mut self) { + self.0.kill().expect("unable to kill event-broker"); + } } #[test] fn test_probe_no_coalesce() { bump_memlock_rlimit().expect("unable to bump memlock rlimit"); - let agent_path = agent_path(); + let agent_path = target_dir().join("crypto-auditing-agent"); let log_dir = tempdir().expect("unable to create temporary directory"); let log_path = log_dir.path().join("agent.log"); - let mut process = Command::new(&agent_path) + let process = Command::new(&agent_path) .arg("-c") - .arg(fixture_dir().join("agent.conf")) + .arg(fixture_dir().join("conf").join("agent.conf")) .arg("--log-file") .arg(&log_path) .arg("--library") @@ -36,6 +59,9 @@ fn test_probe_no_coalesce() { .spawn() .expect("unable to spawn agent"); + // Make sure the agent process will be killed at exit + let process = AgentProcess(process); + // Wait until the agent starts up for _ in 0..5 { if log_path.exists() { @@ -45,86 +71,80 @@ fn test_probe_no_coalesce() { } assert!(log_path.exists()); - let result = panic::catch_unwind(|| { - let foo = String::from("foo\0"); - let bar = String::from("bar\0"); - let baz = String::from("baz\0"); - - let (_link, object) = - attach_bpf(&process, &agent_path).expect("unable to attach agent.bpf.o"); - let map = object.map("ringbuf").expect("unable to get ringbuf map"); - - let timeout = Duration::from_secs(10); - - let result = with_ringbuffer( - map, - || { - probe!(crypto_auditing, new_context, 1, 2); - }, - timeout, - ) - .expect("unable to exercise probe points"); - assert_eq!(result, 1); - let result = with_ringbuffer( - map, - || { - probe!(crypto_auditing, word_data, 1, foo.as_ptr(), 3); - }, - timeout, - ) - .expect("unable to exercise probe points"); - assert_eq!(result, 1); - let result = with_ringbuffer( - map, - || { - probe!(crypto_auditing, string_data, 1, bar.as_ptr(), bar.as_ptr()); - }, - timeout, - ) - .expect("unable to exercise probe points"); - assert_eq!(result, 1); - let result = with_ringbuffer( - map, - || { - probe!( - crypto_auditing, - blob_data, - 1, - baz.as_ptr(), - baz.as_ptr(), - baz.len() - ); - }, - timeout, - ) - .expect("unable to exercise probe points"); - assert_eq!(result, 1); - let result = with_ringbuffer( - map, - || { - probe!(crypto_auditing, new_context, 4, 5); - }, - timeout, - ) - .expect("unable to exercise probe points"); - assert_eq!(result, 1); - - let log_file = std::fs::File::open(&log_path) - .expect(&format!("unable to read file `{}`", log_path.display())); - - let groups: Result, _> = Deserializer::from_reader(&log_file) - .into_iter::() - .collect(); - let groups = groups.expect("error deserializing"); - assert_eq!(groups.len(), 5); - assert_eq!(groups[0].events().len(), 1); - assert_eq!(groups[1].events().len(), 1); - assert_eq!(groups[2].events().len(), 1); - assert_eq!(groups[3].events().len(), 1); - assert_eq!(groups[4].events().len(), 1); - }); - - process.kill().expect("unable to kill agent"); - - assert!(result.is_ok()); + let foo = String::from("foo\0"); + let bar = String::from("bar\0"); + let baz = String::from("baz\0"); + + let (_link, object) = + attach_bpf(&process.0, &agent_path).expect("unable to attach agent.bpf.o"); + let map = object.map("ringbuf").expect("unable to get ringbuf map"); + + let timeout = Duration::from_secs(10); + + let result = with_ringbuffer( + map, + || { + probe!(crypto_auditing, new_context, 1, 2); + }, + timeout, + ) + .expect("unable to exercise probe points"); + assert_eq!(result, 1); + let result = with_ringbuffer( + map, + || { + probe!(crypto_auditing, word_data, 1, foo.as_ptr(), 3); + }, + timeout, + ) + .expect("unable to exercise probe points"); + assert_eq!(result, 1); + let result = with_ringbuffer( + map, + || { + probe!(crypto_auditing, string_data, 1, bar.as_ptr(), bar.as_ptr()); + }, + timeout, + ) + .expect("unable to exercise probe points"); + assert_eq!(result, 1); + let result = with_ringbuffer( + map, + || { + probe!( + crypto_auditing, + blob_data, + 1, + baz.as_ptr(), + baz.as_ptr(), + baz.len() + ); + }, + timeout, + ) + .expect("unable to exercise probe points"); + assert_eq!(result, 1); + let result = with_ringbuffer( + map, + || { + probe!(crypto_auditing, new_context, 4, 5); + }, + timeout, + ) + .expect("unable to exercise probe points"); + assert_eq!(result, 1); + + let log_file = std::fs::File::open(&log_path) + .expect(&format!("unable to read file `{}`", log_path.display())); + + let groups: Result, _> = Deserializer::from_reader(&log_file) + .into_iter::() + .collect(); + let groups = groups.expect("error deserializing"); + assert_eq!(groups.len(), 5); + assert_eq!(groups[0].events().len(), 1); + assert_eq!(groups[1].events().len(), 1); + assert_eq!(groups[2].events().len(), 1); + assert_eq!(groups[3].events().len(), 1); + assert_eq!(groups[4].events().len(), 1); } diff --git a/crypto-auditing/Cargo.toml b/crypto-auditing/Cargo.toml index 79c85a0..20b22b0 100644 --- a/crypto-auditing/Cargo.toml +++ b/crypto-auditing/Cargo.toml @@ -12,11 +12,11 @@ libc = "0.2" serde = { version = "1.0", features = ["derive"] } serde_cbor = "0.11" serde_with = "2.2" -tarpc = { version = "0.33", features = ["serde-transport", "unix"] } thiserror = "1.0" -tokio = "1.23" -tokio-serde = { version = "0.8", features=["cbor"] } +tokio = { version = "1.23", features = ["net", "rt"] } +tokio-serde = { version = "0.8", features = ["cbor"] } tokio-stream = "0.1" +tokio-util = { version = "0.7", features = ["codec"] } tracing = "0.1" [build-dependencies] @@ -24,7 +24,7 @@ bindgen = "0.63" [dev-dependencies] anyhow = "1.0" -clap = { version = "4", features=["derive"] } +clap = { version = "4", features = ["derive"] } [[example]] name = "client" diff --git a/crypto-auditing/src/event_broker.rs b/crypto-auditing/src/event_broker.rs index 1d6dd90..5be1ed9 100644 --- a/crypto-auditing/src/event_broker.rs +++ b/crypto-auditing/src/event_broker.rs @@ -4,8 +4,6 @@ mod error; pub use error::{Error, Result}; -mod service; - mod client; pub use client::{Client, ClientHandle}; diff --git a/crypto-auditing/src/event_broker/client.rs b/crypto-auditing/src/event_broker/client.rs index e65d39f..9e8c541 100644 --- a/crypto-auditing/src/event_broker/client.rs +++ b/crypto-auditing/src/event_broker/client.rs @@ -1,20 +1,19 @@ // SPDX-License-Identifier: GPL-3.0-or-later // Copyright (C) 2022-2023 The crypto-auditing developers. -use crate::event_broker::{error::Result, service::Subscriber as _, SOCKET_PATH}; +use crate::event_broker::{error::Result, SOCKET_PATH}; use crate::types::EventGroup; use futures::{ future::{self, AbortHandle}, stream::Stream, + SinkExt, TryStreamExt, }; use std::path::{Path, PathBuf}; -use tarpc::{ - context, - server::{self, Channel}, - tokio_serde::formats::Cbor, -}; +use tokio::net::UnixStream; use tokio::sync::mpsc::{self, Receiver, Sender}; +use tokio_serde::{formats::SymmetricalCbor, SymmetricallyFramed}; use tokio_stream::wrappers::ReceiverStream; +use tokio_util::codec::{FramedRead, FramedWrite, LengthDelimitedCodec}; use tracing::info; #[derive(Clone, Debug)] @@ -54,20 +53,6 @@ pub struct Client { receiver: Receiver, } -#[tarpc::server] -impl crate::event_broker::service::Subscriber for ClientInner { - async fn scopes(self, _: context::Context) -> Vec { - self.scopes.clone() - } - - async fn receive(self, _: context::Context, group: EventGroup) { - if let Err(e) = self.sender.send(group).await { - info!(error = %e, - "unable to send event"); - } - } -} - /// A handle for the client connection, which will be aborted once /// the ownership is dropped pub struct ClientHandle(AbortHandle); @@ -110,11 +95,42 @@ impl Client { /// This returns a tuple consisting a [`ClientHandle`] and a [`Stream`] /// which generates a sequence of event groups. pub async fn start(self) -> Result<(ClientHandle, impl Stream)> { - let server = tarpc::serde_transport::unix::connect(&self.address, Cbor::default).await?; - let local_addr = server.local_addr()?; - let handler = server::BaseChannel::with_defaults(server).requests(); - let (handler, abort_handle) = - future::abortable(handler.execute(self.inner.clone().serve())); + let stream = UnixStream::connect(&self.address).await?; + let local_addr = stream.local_addr()?; + + let (de, ser) = stream.into_split(); + + let ser = FramedWrite::new(ser, LengthDelimitedCodec::new()); + let de = FramedRead::new(de, LengthDelimitedCodec::new()); + + let mut ser = SymmetricallyFramed::new(ser, SymmetricalCbor::>::default()); + let mut de = SymmetricallyFramed::new(de, SymmetricalCbor::::default()); + + let inner = self.inner.clone(); + let (handler, abort_handle) = future::abortable(async move { + if let Err(e) = ser.send(inner.scopes).await { + info!(error = %e, + "unable to send subscription request"); + } + loop { + let group = match de.try_next().await { + Ok(group) => group, + Err(e) => { + info!(error = %e, + "unable to deserialize event"); + break; + } + }; + + if let Some(group) = group { + if let Err(e) = inner.sender.send(group).await { + info!(error = %e, + "unable to send event"); + break; + } + } + } + }); tokio::spawn(async move { match handler.await { Ok(()) | Err(future::Aborted) => info!(?local_addr, "client shutdown."), diff --git a/crypto-auditing/src/event_broker/service.rs b/crypto-auditing/src/event_broker/service.rs deleted file mode 100644 index 79c6ea5..0000000 --- a/crypto-auditing/src/event_broker/service.rs +++ /dev/null @@ -1,10 +0,0 @@ -// SPDX-License-Identifier: GPL-3.0-or-later -// Copyright (C) 2022-2023 The crypto-auditing developers. - -use crate::types::EventGroup; - -#[tarpc::service] -pub trait Subscriber { - async fn scopes() -> Vec; - async fn receive(group: EventGroup); -} diff --git a/event-broker/Cargo.toml b/event-broker/Cargo.toml index 8378081..8b28aa4 100644 --- a/event-broker/Cargo.toml +++ b/event-broker/Cargo.toml @@ -19,10 +19,13 @@ futures = "0.3" inotify = "0.10.2" libsystemd = { version = "0.6", optional = true } serde_cbor = "0.11" -tarpc = { version = "0.33", features = ["serde-transport", "unix"] } tokio = { version = "1.23", features = ["macros", "rt-multi-thread"] } tokio-serde = { version = "0.8", features = ["cbor"] } tokio-stream = "0.1" +tokio-util = "0.7" toml = "0.6" tracing = "0.1" tracing-subscriber = { version = "0.3", features = ["env-filter"] } + +[dev-dependencies] +tempfile = "3" diff --git a/event-broker/src/main.rs b/event-broker/src/main.rs index ae48e4f..4dec61d 100644 --- a/event-broker/src/main.rs +++ b/event-broker/src/main.rs @@ -5,7 +5,7 @@ use anyhow::bail; use anyhow::{Context as _, Result}; use crypto_auditing::types::EventGroup; -use futures::{future, stream::StreamExt, try_join}; +use futures::{future, stream::StreamExt, try_join, SinkExt, TryStreamExt}; use inotify::{EventMask, Inotify, WatchMask}; #[cfg(feature = "libsystemd")] use libsystemd::activation::receive_descriptors; @@ -17,18 +17,16 @@ use std::os::fd::{FromRawFd, IntoRawFd}; use std::os::unix::net::UnixListener as StdUnixListener; use std::path::{Path, PathBuf}; use std::sync::{Arc, RwLock}; -use tarpc::{client, context, tokio_serde::formats::Cbor}; -use tokio::net::UnixListener; +use tokio::net::{unix::OwnedWriteHalf, UnixListener}; use tokio::sync::mpsc::{self, Receiver, Sender}; +use tokio_serde::{formats::SymmetricalCbor, SymmetricallyFramed}; use tokio_stream::wrappers::ReceiverStream; +use tokio_util::codec::{FramedRead, FramedWrite, LengthDelimitedCodec}; use tracing::{debug, info}; use tracing_subscriber::{fmt, prelude::*, EnvFilter}; mod config; -mod service; -use service::SubscriberClient; - struct Reader { log_file: PathBuf, } @@ -70,13 +68,18 @@ impl Reader { } } -#[derive(Clone, Debug)] +#[derive(Debug)] struct Subscription { - client: SubscriberClient, + stream: SymmetricallyFramed< + FramedWrite, + EventGroup, + SymmetricalCbor, + >, scopes: Vec, + errored: bool, } -#[derive(Clone, Debug)] +#[derive(Debug)] struct Publisher { socket_path: PathBuf, subscriptions: Arc>>, @@ -119,34 +122,27 @@ impl Publisher { tokio::spawn(async move { while let Ok((stream, _sock_addr)) = listener.accept().await { - let conn = tarpc::serde_transport::Transport::from((stream, Cbor::default())); - let subscriber_fd = conn.get_ref().as_raw_fd(); - - let tarpc::client::NewClient { - client: subscriber, - dispatch, - } = SubscriberClient::new(client::Config::default(), conn); + let subscriber_fd = stream.as_raw_fd(); debug!(socket = subscriber_fd, "subscriber connected"); - let subscriptions2 = subscriptions.clone(); - tokio::spawn(async move { - if let Err(e) = dispatch.await { - info!(error = %e, - "subscriber connection broken"); - } + let (de, ser) = stream.into_split(); - debug!(socket = %subscriber_fd, "closing connection"); - subscriptions2.write().unwrap().remove(&subscriber_fd); - }); + let ser = FramedWrite::new(ser, LengthDelimitedCodec::new()); + let de = FramedRead::new(de, LengthDelimitedCodec::new()); + + let ser = SymmetricallyFramed::new(ser, SymmetricalCbor::::default()); + let mut de = + SymmetricallyFramed::new(de, SymmetricalCbor::>::default()); // Populate the scopes - if let Ok(scopes) = subscriber.scopes(context::current()).await { + if let Some(scopes) = de.try_next().await.unwrap() { subscriptions.write().unwrap().insert( subscriber_fd, Subscription { - client: subscriber, - scopes: scopes.clone(), + stream: ser, + scopes, + errored: Default::default(), }, ); } @@ -154,20 +150,27 @@ impl Publisher { }); let mut stream = ReceiverStream::new(receiver); - let mut subscriptions; while let Some(group) = stream.next().await { + let mut subscriptions = self.subscriptions.write().unwrap(); let mut publications = Vec::new(); - subscriptions = self.subscriptions.read().unwrap().clone(); - for subscription in subscriptions.values() { + for (_, subscription) in subscriptions.iter_mut() { let mut group = group.clone(); group.events_filtered(&subscription.scopes); if !group.events().is_empty() { - publications.push(subscription.client.receive(context::current(), group)); + publications.push(async move { + if let Err(e) = subscription.stream.send(group).await { + info!(error = %e, "unable to send event"); + subscription.errored = true; + } + }); } } future::join_all(publications).await; + + // Remove errored subscriptions + subscriptions.retain(|_, v| !v.errored); } Ok(()) diff --git a/event-broker/src/service.rs b/event-broker/src/service.rs deleted file mode 100644 index aec5ccb..0000000 --- a/event-broker/src/service.rs +++ /dev/null @@ -1,10 +0,0 @@ -// SPDX-License-Identifier: GPL-3.0-or-later -// Copyright (C) 2022-2023 The crypto-auditing developers. - -use crypto_auditing::types::EventGroup; - -#[tarpc::service] -pub trait Subscriber { - async fn scopes() -> Vec; - async fn receive(group: EventGroup); -} diff --git a/event-broker/tests/test.rs b/event-broker/tests/test.rs new file mode 100644 index 0000000..e46a987 --- /dev/null +++ b/event-broker/tests/test.rs @@ -0,0 +1,107 @@ +// SPDX-License-Identifier: GPL-3.0-or-later +// Copyright (C) 2023 The crypto-auditing developers. + +use crypto_auditing::event_broker::Client; +use futures::stream::StreamExt; +use std::env; +use std::fs; +use std::io::{Read, Write}; +use std::path::PathBuf; +use std::process::{Child, Command}; +use std::thread; +use std::time::Duration; +use tempfile::tempdir; +use tracing_subscriber::{fmt, prelude::*, EnvFilter}; + +fn fixture_dir() -> PathBuf { + PathBuf::from(env!("CARGO_MANIFEST_DIR")) + .parent() + .unwrap() + .join("fixtures") +} + +fn target_dir() -> PathBuf { + env::current_exe() + .ok() + .map(|mut path| { + path.pop(); + if path.ends_with("deps") { + path.pop(); + } + path + }) + .unwrap() +} + +struct EventBrokerProcess(Child); + +impl Drop for EventBrokerProcess { + fn drop(&mut self) { + self.0.kill().expect("unable to kill event-broker"); + } +} + +#[tokio::test] +async fn test_event_broker() { + tracing_subscriber::registry() + .with(fmt::layer()) + .with(EnvFilter::from_default_env()) + .try_init() + .expect("unable to initialize subscriber"); + + let event_broker_path = target_dir().join("crypto-auditing-event-broker"); + let test_dir = tempdir().expect("unable to create temporary directory"); + + let log_path = test_dir.path().join("agent.log"); + let mut log_file = fs::OpenOptions::new() + .write(true) + .create(true) + .append(true) + .open(&log_path) + .expect("unable to write log file"); + + let socket_path = test_dir.path().join("audit.sock"); + + let process = Command::new(&event_broker_path) + .arg("-c") + .arg(fixture_dir().join("conf").join("event-broker.conf")) + .arg("--log-file") + .arg(&log_path) + .arg("--socket-path") + .arg(&socket_path) + .spawn() + .expect("unable to spawn event-broker"); + + let _process = EventBrokerProcess(process); + + // Wait until the agent starts up + for _ in 0..5 { + if socket_path.exists() { + break; + } + thread::sleep(Duration::from_millis(100)); + } + assert!(socket_path.exists()); + + let client = Client::new() + .scopes(&vec!["tls".to_string()]) + .address(&socket_path); + + let (_handle, mut reader) = client.start().await.expect("unable to start client"); + + // Append more data to log file + let mut fixture_file = fs::OpenOptions::new() + .read(true) + .open(&fixture_dir().join("normal").join("output.cborseq")) + .expect("unable to open fixture"); + let mut buffer = Vec::new(); + fixture_file + .read_to_end(&mut buffer) + .expect("unable to read fixture"); + log_file + .write_all(&buffer) + .expect("unable to append fixture"); + log_file.flush().expect("unable to flush fixture"); + + assert!(reader.next().await.is_some()); +} diff --git a/agent/fixtures/agent.conf b/fixtures/conf/agent.conf similarity index 81% rename from agent/fixtures/agent.conf rename to fixtures/conf/agent.conf index 44ae95c..46747ef 100644 --- a/agent/fixtures/agent.conf +++ b/fixtures/conf/agent.conf @@ -2,6 +2,4 @@ # log_file = "/var/log/crypto-auditing/audit.cborseq" # user = "crypto-auditing:crypto-auditing" # coalesce_window = 100 -# max_events = 1000000 - -coalesce_window = 0 +# max_events = 1000000 \ No newline at end of file diff --git a/fixtures/conf/event-broker.conf b/fixtures/conf/event-broker.conf new file mode 100644 index 0000000..2eeeb73 --- /dev/null +++ b/fixtures/conf/event-broker.conf @@ -0,0 +1,2 @@ +# log_file = "/var/log/crypto-auditing/audit.cborseq" +# socket_path = "/run/crypto-auditing/audit.sock"