Skip to content

Commit

Permalink
event-broker: Shut down activated process where possible
Browse files Browse the repository at this point in the history
When event-broker is launched through systemd activation and no
subscribers remain, we don't need to poll inotify and incoming
connections until a new subscriber is online.

This also adds handling of Ctrl-C to stop the deamon if it is not
activated through systemd.

Signed-off-by: Daiki Ueno <[email protected]>
  • Loading branch information
ueno committed Nov 26, 2023
1 parent c4ebc21 commit 4d2fbf2
Show file tree
Hide file tree
Showing 6 changed files with 175 additions and 69 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ thiserror = "1.0"
time = "0.3"
tokio = "1.23"
tokio-serde = { version = "0.8", features = ["cbor"] }
tokio-stream = "0.1"
tokio-stream = { version = "0.1", features = ["sync"] }
tokio-util = { version = "0.7", features = ["codec"] }
toml = "0.7"
tracing = "0.1"
Expand Down
2 changes: 1 addition & 1 deletion event-broker/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ futures.workspace = true
inotify.workspace = true
libsystemd = { version = "0.7", optional = true }
serde_cbor.workspace = true
tokio = { workspace = true, features = ["macros", "rt-multi-thread"] }
tokio = { workspace = true, features = ["macros", "rt-multi-thread", "signal"] }
tokio-serde.workspace = true
tokio-stream.workspace = true
tokio-util.workspace = true
Expand Down
15 changes: 10 additions & 5 deletions event-broker/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,9 @@ impl Config {
}

fn merge_config_file(&mut self, file: impl AsRef<Path>) -> Result<()> {
let s = fs::read_to_string(file.as_ref())
.with_context(|| format!("unable to read config file `{}`", file.as_ref().display()))?;
let s = fs::read_to_string(file.as_ref()).with_context(|| {
format!("unable to read config file `{}`", file.as_ref().display())
})?;
let config = Table::from_str(&s).with_context(|| {
format!("unable to parse config file `{}`", file.as_ref().display())
})?;
Expand All @@ -90,11 +91,15 @@ impl Config {
}

fn merge_arg_matches(&mut self, matches: &ArgMatches) -> Result<()> {
if let Some(ValueSource::CommandLine) = matches.value_source("log-file") {
self.log_file = matches.try_get_one::<PathBuf>("log-file")?.unwrap().clone();
if let Some(ValueSource::CommandLine) = matches.value_source("log-file")
{
self.log_file =
matches.try_get_one::<PathBuf>("log-file")?.unwrap().clone();
}

if let Some(ValueSource::CommandLine) = matches.value_source("socket-path") {
if let Some(ValueSource::CommandLine) =
matches.value_source("socket-path")
{
self.socket_path = matches
.try_get_one::<PathBuf>("socket-path")?
.unwrap()
Expand Down
221 changes: 160 additions & 61 deletions event-broker/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,20 +5,26 @@
use anyhow::bail;
use anyhow::{Context as _, Result};
use crypto_auditing::types::EventGroup;
use futures::{future, stream::StreamExt, try_join, SinkExt, TryStreamExt};
use futures::{future, stream::StreamExt, Stream, try_join, SinkExt, TryStreamExt};
use inotify::{EventMask, Inotify, WatchMask};
#[cfg(feature = "libsystemd")]
use libsystemd::activation::receive_descriptors;
use serde_cbor::de::Deserializer;
use std::collections::HashMap;
use std::fs;
use std::marker;
use std::os::fd::{AsRawFd, RawFd};
#[cfg(feature = "libsystemd")]
use std::os::fd::{FromRawFd, IntoRawFd};
use std::os::unix::net::UnixListener as StdUnixListener;
use std::path::{Path, PathBuf};
use std::sync::{Arc, RwLock};
use tokio::net::{unix::OwnedWriteHalf, UnixListener};
use tokio::sync::mpsc::{self, Receiver, Sender};
use tokio::net::{unix::OwnedWriteHalf, UnixListener, UnixStream};
use tokio::signal;
use tokio::sync::{
broadcast,
mpsc,
};
use tokio_serde::{formats::SymmetricalCbor, SymmetricallyFramed};
use tokio_stream::wrappers::ReceiverStream;
use tokio_util::codec::{FramedRead, FramedWrite, LengthDelimitedCodec};
Expand All @@ -37,30 +43,41 @@ impl Reader {
Self { log_file }
}

async fn read(&self, sender: Sender<EventGroup>) -> Result<()> {
let inotify =
Inotify::init().with_context(|| "unable to initialize inotify".to_string())?;
async fn read(
&self,
event_sender: &mpsc::Sender<EventGroup>,
shutdown_receiver: &mut broadcast::Receiver<()>,
) -> Result<()> {
let inotify = Inotify::init()
.with_context(|| "unable to initialize inotify".to_string())?;
inotify
.watches()
.add(&self.log_file, WatchMask::MODIFY | WatchMask::CREATE)
.with_context(|| format!("unable to monitor {}", self.log_file.display()))?;
let mut file = std::fs::File::open(&self.log_file).ok();
.with_context(|| {
format!("unable to monitor {}", self.log_file.display())
})?;
let mut file = fs::File::open(&self.log_file)
.with_context(|| {
format!("unable to open {}", self.log_file.display())
})?;

let mut buffer = [0; 1024];
let mut stream = inotify.into_event_stream(&mut buffer)?;

while let Some(event_or_error) = stream.next().await {
let event = event_or_error?;
if event.mask.contains(EventMask::CREATE) {
let new_file = std::fs::File::open(&self.log_file).with_context(|| {
format!("unable to read file `{}`", self.log_file.display())
})?;
let _old = file.replace(new_file);
}
if let Some(ref file) = file {
for group in Deserializer::from_reader(file).into_iter::<EventGroup>() {
sender.send(group?).await?
}
loop {
tokio::select! {
Some(event_or_error) = stream.next() => {
let event = event_or_error?;
if event.mask.contains(EventMask::CREATE) {
file = fs::File::open(&self.log_file).with_context(|| {
format!("unable to read file `{}`", self.log_file.display())
})?;
}
for group in Deserializer::from_reader(&mut file).into_iter::<EventGroup>() {
event_sender.send(group?).await?
}
},
_ = shutdown_receiver.recv() => break,
}
}

Expand All @@ -83,6 +100,7 @@ struct Subscription {
struct Publisher {
socket_path: PathBuf,
subscriptions: Arc<RwLock<HashMap<RawFd, Subscription>>>,
activated: Arc<RwLock<bool>>,
}

impl Publisher {
Expand All @@ -91,21 +109,28 @@ impl Publisher {
Self {
socket_path,
subscriptions: Arc::new(RwLock::new(HashMap::new())),
activated: Arc::new(RwLock::new(false)),
}
}

#[cfg(feature = "libsystemd")]
fn get_std_listener(&self) -> Result<StdUnixListener> {
if let Ok(mut descriptors) = receive_descriptors(false) {
if descriptors.len() > 1 {
bail!("too many file descriptors");
} else if descriptors.is_empty() {
bail!("no file descriptors received");
match receive_descriptors(false) {
Ok(mut descriptors) => {
if descriptors.len() > 1 {
bail!("too many file descriptors");
} else if descriptors.is_empty() {
bail!("no file descriptors received");
}
let fd = descriptors.pop().unwrap().into_raw_fd();
let mut activated = self.activated.write().unwrap();
*activated = true;
Ok(unsafe { StdUnixListener::from_raw_fd(fd) })
}
Err(e) => {
info!(error = %e, "unable to receive file descriptors");
Ok(StdUnixListener::bind(&self.socket_path)?)
}
let fd = descriptors.pop().unwrap().into_raw_fd();
Ok(unsafe { StdUnixListener::from_raw_fd(fd) })
} else {
Ok(StdUnixListener::bind(&self.socket_path)?)
}
}

Expand All @@ -114,43 +139,82 @@ impl Publisher {
Ok(StdUnixListener::bind(&self.socket_path)?)
}

async fn listen(&self) -> Result<()> {
let std_listener = self.get_std_listener()?;
std_listener.set_nonblocking(true)?;
let listener = UnixListener::from_std(std_listener)?;
async fn accept_subscriber(&self, stream: UnixStream) -> Result<()> {
let subscriber_fd = stream.as_raw_fd();

while let Ok((stream, _sock_addr)) = listener.accept().await {
let subscriber_fd = stream.as_raw_fd();
debug!(socket = subscriber_fd, "subscriber connected");

debug!(socket = subscriber_fd, "subscriber connected");
let (de, ser) = stream.into_split();

let (de, ser) = stream.into_split();
let ser = FramedWrite::new(ser, LengthDelimitedCodec::new());
let de = FramedRead::new(de, LengthDelimitedCodec::new());

let ser = FramedWrite::new(ser, LengthDelimitedCodec::new());
let de = FramedRead::new(de, LengthDelimitedCodec::new());
let ser = SymmetricallyFramed::new(
ser,
SymmetricalCbor::<EventGroup>::default(),
);
let mut de = SymmetricallyFramed::new(
de,
SymmetricalCbor::<Vec<String>>::default(),
);

let ser = SymmetricallyFramed::new(ser, SymmetricalCbor::<EventGroup>::default());
let mut de =
SymmetricallyFramed::new(de, SymmetricalCbor::<Vec<String>>::default());
// Populate the scopes
if let Some(scopes) = de.try_next().await.unwrap() {
self.subscriptions.write().unwrap().insert(
subscriber_fd,
Subscription {
stream: ser,
scopes,
errored: Default::default(),
},
);
}
Ok(())
}

async fn listen(
&self,
shutdown_receiver: &mut broadcast::Receiver<()>,
) -> Result<()> {
let std_listener = self.get_std_listener()?;
std_listener.set_nonblocking(true)?;
let listener = UnixListener::from_std(std_listener)?;

// Populate the scopes
if let Some(scopes) = de.try_next().await.unwrap() {
self.subscriptions.write().unwrap().insert(
subscriber_fd,
Subscription {
stream: ser,
scopes,
errored: Default::default(),
},
);
loop {
tokio::select! {
maybe_stream = listener.accept() => {
let stream = match maybe_stream {
Ok((stream, _sock_addr)) => stream,
Err(e) => {
info!(error = %e, "unable to accept connection");
break;
}
};
if let Err(e) = self.accept_subscriber(stream).await {
info!(error = %e, "unable to accept subscriber");
break;
}
},
_ = shutdown_receiver.recv() => {
if !*self.activated.read().unwrap() {
drop(listener);
if let Err(e) = fs::remove_file(&self.socket_path) {
info!(error = %e, "error removing socket");
}
}
break;
},
}
}
Ok(())
}

async fn publish(&self, receiver: Receiver<EventGroup>) -> Result<()> {
let mut stream = ReceiverStream::new(receiver);
while let Some(group) = stream.next().await {
async fn publish(
&self,
mut event_stream: impl Stream<Item = EventGroup> + marker::Unpin,
shutdown_sender: &broadcast::Sender<()>,
) -> Result<()> {
while let Some(group) = event_stream.next().await {
let mut subscriptions = self.subscriptions.write().unwrap();
let mut publications = Vec::new();

Expand All @@ -171,7 +235,12 @@ impl Publisher {

// Remove errored subscriptions
subscriptions.retain(|_, v| !v.errored);
if subscriptions.is_empty() {

if *self.activated.read().unwrap() && subscriptions.is_empty() {
info!("shutting down event broker");
if let Err(e) = shutdown_sender.send(()) {
info!(error = %e, "unable to send shutdown");
}
break;
}
}
Expand All @@ -180,6 +249,28 @@ impl Publisher {
}
}

async fn shutdown(
shutdown_receiver: &mut broadcast::Receiver<()>,
shutdown_sender: &broadcast::Sender<()>,
) -> Result<()> {
loop {
tokio::select! {
maybe_value = signal::ctrl_c() => {
if let Err(e) = maybe_value {
info!(error = %e, "error receiving ctrl-c")
}
info!("shutting down event broker");
if let Err(e) = shutdown_sender.send(()) {
info!(error = %e, "unable to send shutdown");
}
break;
},
_ = shutdown_receiver.recv() => break,
}
}
Ok(())
}

#[tokio::main]
async fn main() -> anyhow::Result<()> {
let config = config::Config::new()?;
Expand All @@ -192,10 +283,18 @@ async fn main() -> anyhow::Result<()> {
let reader = Reader::new(&config.log_file);
let publisher = Publisher::new(&config.socket_path);

let (tx, rx) = mpsc::channel::<EventGroup>(10);
let (event_tx, event_rx) = mpsc::channel::<EventGroup>(10);
let mut event_rx = ReceiverStream::new(event_rx);

let (shutdown_tx, mut shutdown_rx1) = broadcast::channel::<()>(2);
let mut shutdown_rx2 = shutdown_tx.subscribe();
let mut shutdown_rx3 = shutdown_tx.subscribe();

try_join!(
reader.read(tx),
publisher.listen(),
publisher.publish(rx),
).map(|_| ())
shutdown(&mut shutdown_rx1, &shutdown_tx),
reader.read(&event_tx, &mut shutdown_rx2),
publisher.listen(&mut shutdown_rx3),
publisher.publish(&mut event_rx, &shutdown_tx),
)
.map(|_| ())
}
3 changes: 2 additions & 1 deletion event-broker/tests/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,8 @@ async fn test_event_broker() {
.scopes(&vec!["tls".to_string()])
.address(&socket_path);

let (_handle, mut reader) = client.start().await.expect("unable to start client");
let (_handle, mut reader) =
client.start().await.expect("unable to start client");

// Append more data to log file
let mut fixture_file = fs::OpenOptions::new()
Expand Down

0 comments on commit 4d2fbf2

Please sign in to comment.