diff --git a/amqprs/tests/test_callback.rs b/amqprs/tests/test_callback.rs index 4702ccc..c612522 100644 --- a/amqprs/tests/test_callback.rs +++ b/amqprs/tests/test_callback.rs @@ -1,8 +1,17 @@ use amqprs::{ - callbacks::{DefaultChannelCallback, DefaultConnectionCallback}, - channel::{ExchangeDeclareArguments, ExchangeType}, + callbacks::{ChannelCallback, DefaultChannelCallback, DefaultConnectionCallback}, + channel::{ + BasicConsumeArguments, Channel, ExchangeDeclareArguments, ExchangeType, + QueueDeclareArguments, QueueDeleteArguments, + }, connection::Connection, + consumer::DefaultConsumer, + error::Error, + Ack, BasicProperties, Cancel, CloseChannel, Nack, Return, }; +use async_trait::async_trait; +use std::{sync::Arc, time::Duration}; +use tokio::sync::Notify; mod common; @@ -55,3 +64,78 @@ async fn test_channel_callback() { let args = ExchangeDeclareArguments::of_type("amq.topic", ExchangeType::Topic); channel.exchange_declare(args).await.unwrap(); } + +#[tokio::test] +async fn test_channel_callback_close() { + common::setup_logging(); + + // open a connection to RabbitMQ server + let args = common::build_conn_args(); + let connection = Connection::open(&args).await.unwrap(); + + // open a channel on the connection + let channel = connection.open_channel(None).await.unwrap(); + let cancel_notify = Arc::new(Notify::new()); + channel + .register_callback(ChannelCancelCallback { + cancel_notify: Arc::clone(&cancel_notify), + }) + .await + .unwrap(); + + // declare queue + const QUEUE_NAME: &str = "test-channel-callback-close"; + let args = QueueDeclareArguments::default() + .queue(QUEUE_NAME.to_string()) + .finish(); + channel.queue_declare(args).await.unwrap(); + + // start consumer + let args = BasicConsumeArguments::default() + .queue(QUEUE_NAME.to_string()) + .finish(); + channel + .basic_consume(DefaultConsumer::new(true), args) + .await + .unwrap(); + + // delete queue so RabbitMQ produce consumer cancel notification + let args = QueueDeleteArguments::default() + .queue(QUEUE_NAME.to_string()) + .finish(); + channel.queue_delete(args).await.unwrap(); + + // wait for callback + tokio::time::timeout(Duration::from_secs(5), cancel_notify.notified()) + .await + .expect("ChannelCallback::cancel() should be called when queue was deleted"); +} + +struct ChannelCancelCallback { + cancel_notify: Arc, +} + +#[async_trait] +impl ChannelCallback for ChannelCancelCallback { + async fn close(&mut self, _channel: &Channel, _close: CloseChannel) -> Result<(), Error> { + Ok(()) + } + async fn cancel(&mut self, _channel: &Channel, _cancel: Cancel) -> Result<(), Error> { + self.cancel_notify.notify_one(); + + Ok(()) + } + async fn flow(&mut self, _channel: &Channel, active: bool) -> Result { + Ok(active) + } + async fn publish_ack(&mut self, _channel: &Channel, _ack: Ack) {} + async fn publish_nack(&mut self, _channel: &Channel, _nack: Nack) {} + async fn publish_return( + &mut self, + _channel: &Channel, + _ret: Return, + _basic_properties: BasicProperties, + _content: Vec, + ) { + } +}