From 915b6736a8e6ef5804f3ccb4bc87f28086d71363 Mon Sep 17 00:00:00 2001 From: OverworkedCriminal Date: Mon, 30 Oct 2023 15:23:31 +0100 Subject: [PATCH 1/2] Add consumer_cancel_notify to client_capabilities in Connection::open() --- amqprs/src/api/connection.rs | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/amqprs/src/api/connection.rs b/amqprs/src/api/connection.rs index 93a5ef9..3e8ce1d 100644 --- a/amqprs/src/api/connection.rs +++ b/amqprs/src/api/connection.rs @@ -606,6 +606,15 @@ impl Connection { "version".try_into().unwrap(), FieldValue::S("0.1".try_into().unwrap()), ); + let mut client_properties_capabilities = FieldTable::new(); + client_properties_capabilities.insert( + "consumer_cancel_notify".try_into().unwrap(), + true.into() + ); + client_properties.insert( + "capabilities".try_into().unwrap(), + FieldValue::F(client_properties_capabilities) + ); // S: `Start` C: `StartOk` let server_properties = From 89ea076d6be9352f3064e6dd04a145a1da9cc202 Mon Sep 17 00:00:00 2001 From: OverworkedCriminal Date: Mon, 30 Oct 2023 16:22:45 +0100 Subject: [PATCH 2/2] Test - ChannelCallback::cancel() is called after deleting queue --- amqprs/tests/test_callback.rs | 88 ++++++++++++++++++++++++++++++++++- 1 file changed, 86 insertions(+), 2 deletions(-) diff --git a/amqprs/tests/test_callback.rs b/amqprs/tests/test_callback.rs index 4702ccc..c612522 100644 --- a/amqprs/tests/test_callback.rs +++ b/amqprs/tests/test_callback.rs @@ -1,8 +1,17 @@ use amqprs::{ - callbacks::{DefaultChannelCallback, DefaultConnectionCallback}, - channel::{ExchangeDeclareArguments, ExchangeType}, + callbacks::{ChannelCallback, DefaultChannelCallback, DefaultConnectionCallback}, + channel::{ + BasicConsumeArguments, Channel, ExchangeDeclareArguments, ExchangeType, + QueueDeclareArguments, QueueDeleteArguments, + }, connection::Connection, + consumer::DefaultConsumer, + error::Error, + Ack, BasicProperties, Cancel, CloseChannel, Nack, Return, }; +use async_trait::async_trait; +use std::{sync::Arc, time::Duration}; +use tokio::sync::Notify; mod common; @@ -55,3 +64,78 @@ async fn test_channel_callback() { let args = ExchangeDeclareArguments::of_type("amq.topic", ExchangeType::Topic); channel.exchange_declare(args).await.unwrap(); } + +#[tokio::test] +async fn test_channel_callback_close() { + common::setup_logging(); + + // open a connection to RabbitMQ server + let args = common::build_conn_args(); + let connection = Connection::open(&args).await.unwrap(); + + // open a channel on the connection + let channel = connection.open_channel(None).await.unwrap(); + let cancel_notify = Arc::new(Notify::new()); + channel + .register_callback(ChannelCancelCallback { + cancel_notify: Arc::clone(&cancel_notify), + }) + .await + .unwrap(); + + // declare queue + const QUEUE_NAME: &str = "test-channel-callback-close"; + let args = QueueDeclareArguments::default() + .queue(QUEUE_NAME.to_string()) + .finish(); + channel.queue_declare(args).await.unwrap(); + + // start consumer + let args = BasicConsumeArguments::default() + .queue(QUEUE_NAME.to_string()) + .finish(); + channel + .basic_consume(DefaultConsumer::new(true), args) + .await + .unwrap(); + + // delete queue so RabbitMQ produce consumer cancel notification + let args = QueueDeleteArguments::default() + .queue(QUEUE_NAME.to_string()) + .finish(); + channel.queue_delete(args).await.unwrap(); + + // wait for callback + tokio::time::timeout(Duration::from_secs(5), cancel_notify.notified()) + .await + .expect("ChannelCallback::cancel() should be called when queue was deleted"); +} + +struct ChannelCancelCallback { + cancel_notify: Arc, +} + +#[async_trait] +impl ChannelCallback for ChannelCancelCallback { + async fn close(&mut self, _channel: &Channel, _close: CloseChannel) -> Result<(), Error> { + Ok(()) + } + async fn cancel(&mut self, _channel: &Channel, _cancel: Cancel) -> Result<(), Error> { + self.cancel_notify.notify_one(); + + Ok(()) + } + async fn flow(&mut self, _channel: &Channel, active: bool) -> Result { + Ok(active) + } + async fn publish_ack(&mut self, _channel: &Channel, _ack: Ack) {} + async fn publish_nack(&mut self, _channel: &Channel, _nack: Nack) {} + async fn publish_return( + &mut self, + _channel: &Channel, + _ret: Return, + _basic_properties: BasicProperties, + _content: Vec, + ) { + } +}