Skip to content

Commit

Permalink
Added error handling for PubSub operations. Sending corresponding err…
Browse files Browse the repository at this point in the history
…or messages back on channel.
  • Loading branch information
dmackdev committed Sep 21, 2023
1 parent 6d50fc1 commit 0be5f17
Show file tree
Hide file tree
Showing 3 changed files with 58 additions and 36 deletions.
3 changes: 2 additions & 1 deletion pubsubman/src/app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,8 +91,9 @@ impl App {

self.exit_state.subscription_cleanup_state = SubscriptionCleanupState::Complete;
}
BackendMessage::Error(_) => {}
},
Err(_err) => {} //println!("{:?}", err),
Err(_err) => {}
}
}

Expand Down
82 changes: 47 additions & 35 deletions pubsubman_backend/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use google_cloud_pubsub::{
client::{Client, ClientConfig},
subscription::SubscriptionConfig,
};
use message::{BackendMessage, FrontendMessage};
use message::{BackendError, BackendMessage, FrontendMessage};
use model::{PubsubMessageToPublish, SubscriptionName, TopicName};
use tokio::{
runtime::{Builder, Runtime},
Expand Down Expand Up @@ -78,18 +78,15 @@ impl Backend {
let client = self.client.clone();

self.rt.spawn(async move {
let topics = client
let message = client
.get_topics(None)
.await
.unwrap()
.into_iter()
.map(TopicName)
.collect();
.map(|topics| {
BackendMessage::TopicsUpdated(topics.into_iter().map(TopicName).collect())
})
.unwrap_or_else(|_| BackendMessage::Error(BackendError::GetTopicsFailed));

back_tx
.send(BackendMessage::TopicsUpdated(topics))
.await
.unwrap();
back_tx.send(message).await.unwrap();
});
}

Expand All @@ -98,25 +95,23 @@ impl Backend {
let client = self.client.clone();

self.rt.spawn(async move {
let sub_name = format!("pubsubman-subscription-{}", Uuid::new_v4());

let subscription = client
let message = match client
.create_subscription(
&sub_name,
&format!("pubsubman-subscription-{}", Uuid::new_v4()),
&topic_name.0,
SubscriptionConfig::default(),
None,
)
.await
.unwrap();
{
Ok(subscription) => {
let fq_sub_name = subscription.fully_qualified_name().to_owned();
BackendMessage::SubscriptionCreated(topic_name, SubscriptionName(fq_sub_name))
}
Err(_) => BackendMessage::Error(BackendError::CreateSubscriptionFailed(topic_name)),
};

back_tx
.send(BackendMessage::SubscriptionCreated(
topic_name,
SubscriptionName(subscription.fully_qualified_name().to_owned()),
))
.await
.unwrap();
back_tx.send(message).await.unwrap();
});
}

Expand Down Expand Up @@ -158,19 +153,27 @@ impl Backend {
let subscription = client.subscription(&sub_name.0);

let pull_messages_future = async move {
let mut stream = subscription.subscribe(None).await.unwrap();

while let Some(message) = stream.next().await {
let _ = message.ack().await;

back_tx
.send(BackendMessage::MessageReceived(
topic_name.clone(),
message.into(),
))
match subscription.subscribe(None).await {
Ok(mut stream) => {
while let Some(message) = stream.next().await {
let _ = message.ack().await;

back_tx
.send(BackendMessage::MessageReceived(
topic_name.clone(),
message.into(),
))
.await
.unwrap();
}
}
Err(_) => back_tx
.send(BackendMessage::Error(BackendError::StreamMessagesFailed(
topic_name, sub_name,
)))
.await
.unwrap();
}
.unwrap(),
};
};

select! {
Expand All @@ -181,13 +184,22 @@ impl Backend {
}

fn publish_message(&self, topic_name: TopicName, message: PubsubMessageToPublish) {
let back_tx = self.back_tx.clone();
let client = self.client.clone();

self.rt.spawn(async move {
let topic = client.topic(&topic_name.0);
let publisher = topic.new_publisher(None);
let awaiter = publisher.publish(message.into()).await;
awaiter.get().await

if let Err(_) = awaiter.get().await {

Check failure on line 195 in pubsubman_backend/src/lib.rs

View workflow job for this annotation

GitHub Actions / Clippy

redundant pattern matching, consider using `is_err()`
back_tx
.send(BackendMessage::Error(BackendError::PublishMessageFailed(
topic_name,
)))
.await
.unwrap()
}
});
}
}
Expand Down
9 changes: 9 additions & 0 deletions pubsubman_backend/src/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,4 +17,13 @@ pub enum BackendMessage {
SubscriptionCreated(TopicName, SubscriptionName),
MessageReceived(TopicName, PubsubMessage),
SubscriptionsDeleted(Vec<Result<SubscriptionName, SubscriptionName>>),
Error(BackendError),
}

#[derive(Debug)]
pub enum BackendError {
GetTopicsFailed,
CreateSubscriptionFailed(TopicName),
StreamMessagesFailed(TopicName, SubscriptionName),
PublishMessageFailed(TopicName),
}

0 comments on commit 0be5f17

Please sign in to comment.