diff --git a/pubsubman/src/app.rs b/pubsubman/src/app.rs index 57fdca6..3f60060 100644 --- a/pubsubman/src/app.rs +++ b/pubsubman/src/app.rs @@ -91,8 +91,9 @@ impl App { self.exit_state.subscription_cleanup_state = SubscriptionCleanupState::Complete; } + BackendMessage::Error(_) => {} }, - Err(_err) => {} //println!("{:?}", err), + Err(_err) => {} } } diff --git a/pubsubman_backend/src/lib.rs b/pubsubman_backend/src/lib.rs index b75dee9..6ef4b84 100644 --- a/pubsubman_backend/src/lib.rs +++ b/pubsubman_backend/src/lib.rs @@ -6,7 +6,7 @@ use google_cloud_pubsub::{ client::{Client, ClientConfig}, subscription::SubscriptionConfig, }; -use message::{BackendMessage, FrontendMessage}; +use message::{BackendError, BackendMessage, FrontendMessage}; use model::{PubsubMessageToPublish, SubscriptionName, TopicName}; use tokio::{ runtime::{Builder, Runtime}, @@ -78,18 +78,15 @@ impl Backend { let client = self.client.clone(); self.rt.spawn(async move { - let topics = client + let message = client .get_topics(None) .await - .unwrap() - .into_iter() - .map(TopicName) - .collect(); + .map(|topics| { + BackendMessage::TopicsUpdated(topics.into_iter().map(TopicName).collect()) + }) + .unwrap_or_else(|_| BackendMessage::Error(BackendError::GetTopicsFailed)); - back_tx - .send(BackendMessage::TopicsUpdated(topics)) - .await - .unwrap(); + back_tx.send(message).await.unwrap(); }); } @@ -98,25 +95,23 @@ impl Backend { let client = self.client.clone(); self.rt.spawn(async move { - let sub_name = format!("pubsubman-subscription-{}", Uuid::new_v4()); - - let subscription = client + let message = match client .create_subscription( - &sub_name, + &format!("pubsubman-subscription-{}", Uuid::new_v4()), &topic_name.0, SubscriptionConfig::default(), None, ) .await - .unwrap(); + { + Ok(subscription) => { + let fq_sub_name = subscription.fully_qualified_name().to_owned(); + BackendMessage::SubscriptionCreated(topic_name, SubscriptionName(fq_sub_name)) + } + Err(_) => BackendMessage::Error(BackendError::CreateSubscriptionFailed(topic_name)), + }; - back_tx - .send(BackendMessage::SubscriptionCreated( - topic_name, - SubscriptionName(subscription.fully_qualified_name().to_owned()), - )) - .await - .unwrap(); + back_tx.send(message).await.unwrap(); }); } @@ -158,19 +153,27 @@ impl Backend { let subscription = client.subscription(&sub_name.0); let pull_messages_future = async move { - let mut stream = subscription.subscribe(None).await.unwrap(); - - while let Some(message) = stream.next().await { - let _ = message.ack().await; - - back_tx - .send(BackendMessage::MessageReceived( - topic_name.clone(), - message.into(), - )) + match subscription.subscribe(None).await { + Ok(mut stream) => { + while let Some(message) = stream.next().await { + let _ = message.ack().await; + + back_tx + .send(BackendMessage::MessageReceived( + topic_name.clone(), + message.into(), + )) + .await + .unwrap(); + } + } + Err(_) => back_tx + .send(BackendMessage::Error(BackendError::StreamMessagesFailed( + topic_name, sub_name, + ))) .await - .unwrap(); - } + .unwrap(), + }; }; select! { @@ -181,13 +184,22 @@ impl Backend { } fn publish_message(&self, topic_name: TopicName, message: PubsubMessageToPublish) { + let back_tx = self.back_tx.clone(); let client = self.client.clone(); self.rt.spawn(async move { let topic = client.topic(&topic_name.0); let publisher = topic.new_publisher(None); let awaiter = publisher.publish(message.into()).await; - awaiter.get().await + + if let Err(_) = awaiter.get().await { + back_tx + .send(BackendMessage::Error(BackendError::PublishMessageFailed( + topic_name, + ))) + .await + .unwrap() + } }); } } diff --git a/pubsubman_backend/src/message.rs b/pubsubman_backend/src/message.rs index cfa595d..c72dde1 100644 --- a/pubsubman_backend/src/message.rs +++ b/pubsubman_backend/src/message.rs @@ -17,4 +17,13 @@ pub enum BackendMessage { SubscriptionCreated(TopicName, SubscriptionName), MessageReceived(TopicName, PubsubMessage), SubscriptionsDeleted(Vec>), + Error(BackendError), +} + +#[derive(Debug)] +pub enum BackendError { + GetTopicsFailed, + CreateSubscriptionFailed(TopicName), + StreamMessagesFailed(TopicName, SubscriptionName), + PublishMessageFailed(TopicName), }