Skip to content

Commit

Permalink
Merge pull request #113 from OverworkedCriminal/client_capabilities
Browse files Browse the repository at this point in the history
Add consumer_cancel_notify to client_capabilities in Connection::open()
  • Loading branch information
gftea authored Oct 31, 2023
2 parents ce10318 + 89ea076 commit 58d8425
Show file tree
Hide file tree
Showing 2 changed files with 95 additions and 2 deletions.
9 changes: 9 additions & 0 deletions amqprs/src/api/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -606,6 +606,15 @@ impl Connection {
"version".try_into().unwrap(),
FieldValue::S("0.1".try_into().unwrap()),
);
let mut client_properties_capabilities = FieldTable::new();
client_properties_capabilities.insert(
"consumer_cancel_notify".try_into().unwrap(),
true.into()
);
client_properties.insert(
"capabilities".try_into().unwrap(),
FieldValue::F(client_properties_capabilities)
);

// S: `Start` C: `StartOk`
let server_properties =
Expand Down
88 changes: 86 additions & 2 deletions amqprs/tests/test_callback.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,17 @@
use amqprs::{
callbacks::{DefaultChannelCallback, DefaultConnectionCallback},
channel::{ExchangeDeclareArguments, ExchangeType},
callbacks::{ChannelCallback, DefaultChannelCallback, DefaultConnectionCallback},
channel::{
BasicConsumeArguments, Channel, ExchangeDeclareArguments, ExchangeType,
QueueDeclareArguments, QueueDeleteArguments,
},
connection::Connection,
consumer::DefaultConsumer,
error::Error,
Ack, BasicProperties, Cancel, CloseChannel, Nack, Return,
};
use async_trait::async_trait;
use std::{sync::Arc, time::Duration};
use tokio::sync::Notify;

mod common;

Expand Down Expand Up @@ -55,3 +64,78 @@ async fn test_channel_callback() {
let args = ExchangeDeclareArguments::of_type("amq.topic", ExchangeType::Topic);
channel.exchange_declare(args).await.unwrap();
}

#[tokio::test]
async fn test_channel_callback_close() {
common::setup_logging();

// open a connection to RabbitMQ server
let args = common::build_conn_args();
let connection = Connection::open(&args).await.unwrap();

// open a channel on the connection
let channel = connection.open_channel(None).await.unwrap();
let cancel_notify = Arc::new(Notify::new());
channel
.register_callback(ChannelCancelCallback {
cancel_notify: Arc::clone(&cancel_notify),
})
.await
.unwrap();

// declare queue
const QUEUE_NAME: &str = "test-channel-callback-close";
let args = QueueDeclareArguments::default()
.queue(QUEUE_NAME.to_string())
.finish();
channel.queue_declare(args).await.unwrap();

// start consumer
let args = BasicConsumeArguments::default()
.queue(QUEUE_NAME.to_string())
.finish();
channel
.basic_consume(DefaultConsumer::new(true), args)
.await
.unwrap();

// delete queue so RabbitMQ produce consumer cancel notification
let args = QueueDeleteArguments::default()
.queue(QUEUE_NAME.to_string())
.finish();
channel.queue_delete(args).await.unwrap();

// wait for callback
tokio::time::timeout(Duration::from_secs(5), cancel_notify.notified())
.await
.expect("ChannelCallback::cancel() should be called when queue was deleted");
}

struct ChannelCancelCallback {
cancel_notify: Arc<Notify>,
}

#[async_trait]
impl ChannelCallback for ChannelCancelCallback {
async fn close(&mut self, _channel: &Channel, _close: CloseChannel) -> Result<(), Error> {
Ok(())
}
async fn cancel(&mut self, _channel: &Channel, _cancel: Cancel) -> Result<(), Error> {
self.cancel_notify.notify_one();

Ok(())
}
async fn flow(&mut self, _channel: &Channel, active: bool) -> Result<bool, Error> {
Ok(active)
}
async fn publish_ack(&mut self, _channel: &Channel, _ack: Ack) {}
async fn publish_nack(&mut self, _channel: &Channel, _nack: Nack) {}
async fn publish_return(
&mut self,
_channel: &Channel,
_ret: Return,
_basic_properties: BasicProperties,
_content: Vec<u8>,
) {
}
}

0 comments on commit 58d8425

Please sign in to comment.