diff --git a/sdk/core/azure_core_amqp/src/fe2o3/receiver.rs b/sdk/core/azure_core_amqp/src/fe2o3/receiver.rs index fad4536f6a..938b322310 100644 --- a/sdk/core/azure_core_amqp/src/fe2o3/receiver.rs +++ b/sdk/core/azure_core_amqp/src/fe2o3/receiver.rs @@ -2,19 +2,19 @@ // Licensed under the MIT license. //cspell: words amqp -use super::error::{AmqpIllegalLinkState, AmqpReceiver, AmqpReceiverAttach}; +use super::error::{AmqpIllegalLinkState, AmqpLinkDetach, AmqpReceiver, AmqpReceiverAttach}; use crate::messaging::{AmqpMessage, AmqpSource}; use crate::receiver::{AmqpReceiverApis, AmqpReceiverOptions, ReceiverCreditMode}; use crate::session::AmqpSession; use async_std::sync::Mutex; use azure_core::error::Result; use std::borrow::BorrowMut; -use std::sync::{Arc, OnceLock}; -use tracing::trace; +use std::sync::OnceLock; +use tracing::{info, trace, warn}; #[derive(Default)] pub(crate) struct Fe2o3AmqpReceiver { - receiver: OnceLock>>, + receiver: OnceLock>, } impl From for fe2o3_amqp::link::receiver::CreditMode { @@ -58,30 +58,40 @@ impl AmqpReceiverApis for Fe2o3AmqpReceiver { .attach(session.implementation.get()?.lock().await.borrow_mut()) .await .map_err(AmqpReceiverAttach::from)?; - self.receiver - .set(Arc::new(Mutex::new(receiver))) - .map_err(|_| { - azure_core::Error::message( - azure_core::error::ErrorKind::Other, - "Could not set message receiver.", - ) - })?; + self.receiver.set(Mutex::new(receiver)).map_err(|_| { + azure_core::Error::message( + azure_core::error::ErrorKind::Other, + "Could not set message receiver.", + ) + })?; Ok(()) } - async fn max_message_size(&self) -> Result> { - Ok(self - .receiver - .get() - .ok_or_else(|| { - azure_core::Error::message( - azure_core::error::ErrorKind::Other, - "Message receiver is not set", - ) - })? - .lock() + async fn detach(mut self) -> Result<()> { + let receiver = self.receiver.take().ok_or_else(|| { + azure_core::Error::message( + azure_core::error::ErrorKind::Other, + "Message Sender not set.", + ) + })?; + let res = receiver + .into_inner() + .detach() .await - .max_message_size()) + .map_err(|e| AmqpLinkDetach::from(e.1)); + match res { + Ok(_) => Ok(()), + Err(e) => match e.0 { + fe2o3_amqp::link::DetachError::ClosedByRemote => { + info!("Error detaching receiver: {:?} - ignored", e); + Ok(()) + } + _ => { + warn!("Error detaching receiver: {:?}", e); + Err(e.into()) + } + }, + } } async fn receive(&self) -> Result { diff --git a/sdk/core/azure_core_amqp/src/fe2o3/sender.rs b/sdk/core/azure_core_amqp/src/fe2o3/sender.rs index 80e5bd7a74..8277e2c409 100644 --- a/sdk/core/azure_core_amqp/src/fe2o3/sender.rs +++ b/sdk/core/azure_core_amqp/src/fe2o3/sender.rs @@ -2,6 +2,10 @@ // Licensed under the MIT license. //cspell: words amqp +use super::error::{ + AmqpDeliveryRejected, AmqpLinkDetach, AmqpNotAccepted, AmqpSenderAttach, AmqpSenderSend, + Fe2o3AmqpError, +}; use crate::messaging::{AmqpMessage, AmqpTarget}; use crate::sender::{AmqpSendOptions, AmqpSenderApis, AmqpSenderOptions}; use crate::session::AmqpSession; @@ -9,11 +13,7 @@ use async_std::sync::Mutex; use azure_core::Result; use std::borrow::BorrowMut; use std::sync::OnceLock; - -use super::error::{ - AmqpDeliveryRejected, AmqpLinkDetach, AmqpNotAccepted, AmqpSenderAttach, AmqpSenderSend, - Fe2o3AmqpError, -}; +use tracing::{info, warn}; #[derive(Default)] pub(crate) struct Fe2o3AmqpSender { @@ -84,12 +84,24 @@ impl AmqpSenderApis for Fe2o3AmqpSender { "Message Sender not set.", ) })?; - sender + let res = sender .into_inner() .detach() .await - .map_err(|e| AmqpLinkDetach::from(e.1))?; - Ok(()) + .map_err(|e| AmqpLinkDetach::from(e.1)); + match res { + Ok(_) => Ok(()), + Err(e) => match e.0 { + fe2o3_amqp::link::DetachError::ClosedByRemote => { + info!("Error detaching sender: {:?}", e); + Ok(()) + } + _ => { + warn!("Error detaching sender: {:?}", e); + Err(e.into()) + } + }, + } } fn max_message_size(&self) -> azure_core::Result> { diff --git a/sdk/core/azure_core_amqp/src/noop.rs b/sdk/core/azure_core_amqp/src/noop.rs index d3cd4c5f68..136bd0ed41 100644 --- a/sdk/core/azure_core_amqp/src/noop.rs +++ b/sdk/core/azure_core_amqp/src/noop.rs @@ -181,7 +181,7 @@ impl AmqpReceiverApis for NoopAmqpReceiver { unimplemented!(); } - async fn max_message_size(&self) -> Result> { + async fn detach(self) -> Result<()> { unimplemented!(); } diff --git a/sdk/core/azure_core_amqp/src/receiver.rs b/sdk/core/azure_core_amqp/src/receiver.rs index 5e9e5e031a..079acf4898 100644 --- a/sdk/core/azure_core_amqp/src/receiver.rs +++ b/sdk/core/azure_core_amqp/src/receiver.rs @@ -55,7 +55,7 @@ pub trait AmqpReceiverApis { source: impl Into, options: Option, ) -> impl std::future::Future>; - fn max_message_size(&self) -> impl std::future::Future>>; + fn detach(self) -> impl std::future::Future>; fn receive(&self) -> impl std::future::Future>; } @@ -73,8 +73,8 @@ impl AmqpReceiverApis for AmqpReceiver { ) -> Result<()> { self.implementation.attach(session, source, options).await } - async fn max_message_size(&self) -> Result> { - self.implementation.max_message_size().await + async fn detach(self) -> Result<()> { + self.implementation.detach().await } async fn receive(&self) -> Result { self.implementation.receive().await