Skip to content

Commit

Permalink
event-broker: Turn listening task into a separate async function
Browse files Browse the repository at this point in the history
Signed-off-by: Daiki Ueno <[email protected]>
  • Loading branch information
ueno committed Apr 1, 2024
1 parent 257fbaf commit f8a5cf3
Showing 1 changed file with 32 additions and 25 deletions.
57 changes: 32 additions & 25 deletions event-broker/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,41 +117,41 @@ impl Publisher {
Ok(StdUnixListener::bind(&self.socket_path)?)
}

async fn publish(&self, receiver: Receiver<EventGroup>) -> Result<()> {
async fn listen(&self) -> Result<()> {
let std_listener = self.get_std_listener()?;
std_listener.set_nonblocking(true)?;
let listener = UnixListener::from_std(std_listener)?;
let subscriptions = self.subscriptions.clone();

tokio::spawn(async move {
while let Ok((stream, _sock_addr)) = listener.accept().await {
let subscriber_fd = stream.as_raw_fd();
while let Ok((stream, _sock_addr)) = listener.accept().await {
let subscriber_fd = stream.as_raw_fd();

debug!(socket = subscriber_fd, "subscriber connected");
debug!(socket = subscriber_fd, "subscriber connected");

let (de, ser) = stream.into_split();
let (de, ser) = stream.into_split();

let ser = FramedWrite::new(ser, LengthDelimitedCodec::new());
let de = FramedRead::new(de, LengthDelimitedCodec::new());
let ser = FramedWrite::new(ser, LengthDelimitedCodec::new());
let de = FramedRead::new(de, LengthDelimitedCodec::new());

let ser = SymmetricallyFramed::new(ser, SymmetricalCbor::<EventGroup>::default());
let mut de =
SymmetricallyFramed::new(de, SymmetricalCbor::<Vec<String>>::default());
let ser = SymmetricallyFramed::new(ser, SymmetricalCbor::<EventGroup>::default());
let mut de =
SymmetricallyFramed::new(de, SymmetricalCbor::<Vec<String>>::default());

// Populate the scopes
if let Some(scopes) = de.try_next().await.unwrap() {
subscriptions.write().await.insert(
subscriber_fd,
Subscription {
stream: ser,
scopes,
errored: Default::default(),
},
);
}
// Populate the scopes
if let Some(scopes) = de.try_next().await.unwrap() {
self.subscriptions.write().await.insert(
subscriber_fd,
Subscription {
stream: ser,
scopes,
errored: Default::default(),
},
);
}
});
}
Ok(())
}

async fn publish(&self, receiver: Receiver<EventGroup>) -> Result<()> {
let mut stream = ReceiverStream::new(receiver);
while let Some(group) = stream.next().await {
let mut subscriptions = self.subscriptions.write().await;
Expand All @@ -174,6 +174,9 @@ impl Publisher {

// Remove errored subscriptions
subscriptions.retain(|_, v| !v.errored);
if subscriptions.is_empty() {
break;
}
}

Ok(())
Expand All @@ -193,5 +196,9 @@ async fn main() -> anyhow::Result<()> {
let publisher = Publisher::new(&config.socket_path);

let (tx, rx) = mpsc::channel::<EventGroup>(10);
try_join!(reader.read(tx), publisher.publish(rx),).map(|_| ())
try_join!(
reader.read(tx),
publisher.listen(),
publisher.publish(rx),
).map(|_| ())
}

0 comments on commit f8a5cf3

Please sign in to comment.