Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement Detach for AMQP message receiver; Remove MessageReceiver from an Arc because it doesn't need to be in one. #1927

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion sdk/core/azure_core_amqp/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ This crate is part of a collection of crates: for more information please refer

## Example

```rust,no_run
```rust no_run
LarryOsterman marked this conversation as resolved.
Show resolved Hide resolved
use azure_messaging::*;

#[tokio::main]
Expand Down
58 changes: 34 additions & 24 deletions sdk/core/azure_core_amqp/src/fe2o3/receiver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,19 @@
// Licensed under the MIT license.
//cspell: words amqp

use super::error::{AmqpIllegalLinkState, AmqpReceiver, AmqpReceiverAttach};
use super::error::{AmqpIllegalLinkState, AmqpLinkDetach, AmqpReceiver, AmqpReceiverAttach};
use crate::messaging::{AmqpMessage, AmqpSource};
use crate::receiver::{AmqpReceiverApis, AmqpReceiverOptions, ReceiverCreditMode};
use crate::session::AmqpSession;
use async_std::sync::Mutex;
use azure_core::error::Result;
use std::borrow::BorrowMut;
use std::sync::{Arc, OnceLock};
use tracing::trace;
use std::sync::OnceLock;
use tracing::{info, trace, warn};

#[derive(Default)]
pub(crate) struct Fe2o3AmqpReceiver {
receiver: OnceLock<Arc<Mutex<fe2o3_amqp::Receiver>>>,
receiver: OnceLock<Mutex<fe2o3_amqp::Receiver>>,
}

impl From<ReceiverCreditMode> for fe2o3_amqp::link::receiver::CreditMode {
Expand Down Expand Up @@ -58,30 +58,40 @@ impl AmqpReceiverApis for Fe2o3AmqpReceiver {
.attach(session.implementation.get()?.lock().await.borrow_mut())
.await
.map_err(AmqpReceiverAttach::from)?;
self.receiver
.set(Arc::new(Mutex::new(receiver)))
.map_err(|_| {
azure_core::Error::message(
azure_core::error::ErrorKind::Other,
"Could not set message receiver.",
)
})?;
self.receiver.set(Mutex::new(receiver)).map_err(|_| {
azure_core::Error::message(
azure_core::error::ErrorKind::Other,
"Could not set message receiver.",
)
})?;
Ok(())
}

async fn max_message_size(&self) -> Result<Option<u64>> {
Ok(self
.receiver
.get()
.ok_or_else(|| {
azure_core::Error::message(
azure_core::error::ErrorKind::Other,
"Message receiver is not set",
)
})?
.lock()
async fn detach(mut self) -> Result<()> {
let receiver = self.receiver.take().ok_or_else(|| {
azure_core::Error::message(
azure_core::error::ErrorKind::Other,
"Message Sender not set.",
)
})?;
let res = receiver
.into_inner()
.detach()
.await
.max_message_size())
.map_err(|e| AmqpLinkDetach::from(e.1));
match res {
Ok(_) => Ok(()),
Err(e) => match e.0 {
fe2o3_amqp::link::DetachError::ClosedByRemote => {
info!("Error detaching receiver: {:?} - ignored", e);
Ok(())
}
_ => {
warn!("Error detaching receiver: {:?}", e);
Err(e.into())
}
},
}
}

async fn receive(&self) -> Result<AmqpMessage> {
Expand Down
28 changes: 20 additions & 8 deletions sdk/core/azure_core_amqp/src/fe2o3/sender.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,18 @@
// Licensed under the MIT license.
//cspell: words amqp

use super::error::{
AmqpDeliveryRejected, AmqpLinkDetach, AmqpNotAccepted, AmqpSenderAttach, AmqpSenderSend,
Fe2o3AmqpError,
};
use crate::messaging::{AmqpMessage, AmqpTarget};
use crate::sender::{AmqpSendOptions, AmqpSenderApis, AmqpSenderOptions};
use crate::session::AmqpSession;
use async_std::sync::Mutex;
use azure_core::Result;
use std::borrow::BorrowMut;
use std::sync::OnceLock;

use super::error::{
AmqpDeliveryRejected, AmqpLinkDetach, AmqpNotAccepted, AmqpSenderAttach, AmqpSenderSend,
Fe2o3AmqpError,
};
use tracing::{info, warn};

#[derive(Default)]
pub(crate) struct Fe2o3AmqpSender {
Expand Down Expand Up @@ -84,12 +84,24 @@ impl AmqpSenderApis for Fe2o3AmqpSender {
"Message Sender not set.",
)
})?;
sender
let res = sender
.into_inner()
.detach()
.await
.map_err(|e| AmqpLinkDetach::from(e.1))?;
Ok(())
.map_err(|e| AmqpLinkDetach::from(e.1));
match res {
Ok(_) => Ok(()),
Err(e) => match e.0 {
fe2o3_amqp::link::DetachError::ClosedByRemote => {
info!("Error detaching sender: {:?}", e);
Ok(())
}
_ => {
warn!("Error detaching sender: {:?}", e);
Err(e.into())
}
},
}
}

fn max_message_size(&self) -> azure_core::Result<Option<u64>> {
Expand Down
2 changes: 1 addition & 1 deletion sdk/core/azure_core_amqp/src/noop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ impl AmqpReceiverApis for NoopAmqpReceiver {
unimplemented!();
}

async fn max_message_size(&self) -> Result<Option<u64>> {
async fn detach(self) -> Result<()> {
unimplemented!();
}

Expand Down
6 changes: 3 additions & 3 deletions sdk/core/azure_core_amqp/src/receiver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ pub trait AmqpReceiverApis {
source: impl Into<AmqpSource>,
options: Option<AmqpReceiverOptions>,
) -> impl std::future::Future<Output = Result<()>>;
fn max_message_size(&self) -> impl std::future::Future<Output = Result<Option<u64>>>;
fn detach(self) -> impl std::future::Future<Output = Result<()>>;
fn receive(&self) -> impl std::future::Future<Output = Result<AmqpMessage>>;
}

Expand All @@ -73,8 +73,8 @@ impl AmqpReceiverApis for AmqpReceiver {
) -> Result<()> {
self.implementation.attach(session, source, options).await
}
async fn max_message_size(&self) -> Result<Option<u64>> {
self.implementation.max_message_size().await
async fn detach(self) -> Result<()> {
self.implementation.detach().await
}
async fn receive(&self) -> Result<AmqpMessage> {
self.implementation.receive().await
Expand Down