diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index b42d5e73..f3914872 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -42,6 +42,12 @@ jobs: run: cargo test --locked --all-features --all-targets env: # set this explicitly so integration tests will run FAKTORY_URL: tcp://127.0.0.1:7419 + # commands executed during the following test affect all the queues on the Faktory server, + # so we perform this test in a dedicated - isolated - step, re-using the the Faktory container + - name: cargo test --locked (queue control actions) + run: cargo test --locked --all-features --all-targets queue_control_actions_wildcard -- --include-ignored + env: # set this explicitly so integration tests will run + FAKTORY_URL: tcp://127.0.0.1:7419 # https://github.com/rust-lang/cargo/issues/6669 - name: cargo test --doc run: cargo test --locked --all-features --doc diff --git a/src/proto/client/mod.rs b/src/proto/client/mod.rs index 997ecd9e..162c9b5d 100644 --- a/src/proto/client/mod.rs +++ b/src/proto/client/mod.rs @@ -312,6 +312,20 @@ where }, } } + + pub(crate) async fn perform_queue_action( + &mut self, + queues: &[Q], + action: QueueAction, + ) -> Result<(), Error> + where + Q: AsRef + Sync, + { + self.issue(&QueueControl::new(action, queues)) + .await? + .read_ok() + .await + } } impl Client @@ -370,25 +384,54 @@ where } /// Pause the given queues. + /// + /// Passing a wildcard `&["*"]` as the value of the `queues` parameter + /// will pause all the queues. To be more explicit, you may want to call [`Client::queue_pause_all`] + /// shortcut method to pause all the queues. pub async fn queue_pause(&mut self, queues: &[Q]) -> Result<(), Error> where Q: AsRef + Sync, { - self.issue(&QueueControl::new(QueueAction::Pause, queues)) - .await? - .read_ok() - .await + self.perform_queue_action(queues, QueueAction::Pause).await + } + + /// Pause all queues. + pub async fn queue_pause_all(&mut self) -> Result<(), Error> { + self.perform_queue_action(&["*"], QueueAction::Pause).await } /// Resume the given queues. + /// + /// Passing a wildcard `&["*"]` as the value of the `queues` parameter + /// will resume all the queues. To be more explicit, you may want to call [`Client::queue_resume_all`] + /// shortcut method to resume all the queues. pub async fn queue_resume(&mut self, queues: &[Q]) -> Result<(), Error> where Q: AsRef + Sync, { - self.issue(&QueueControl::new(QueueAction::Resume, queues)) - .await? - .read_ok() - .await + self.perform_queue_action(queues, QueueAction::Resume).await + } + + /// Resume all queues. + pub async fn queue_resume_all(&mut self) -> Result<(), Error> { + self.perform_queue_action(&["*"], QueueAction::Resume).await + } + + /// Remove the given queues. + /// + /// Beware, passing a wildcard `&["*"]` as the value of the `queues` parameter + /// will **remove** all the queues. To be more explicit, you may want to call [`Client::queue_remove_all`] + /// shortcut method to remove all the queues. + pub async fn queue_remove(&mut self, queues: &[Q]) -> Result<(), Error> + where + Q: AsRef + Sync, + { + self.perform_queue_action(queues, QueueAction::Remove).await + } + + /// Remove all queues. + pub async fn queue_remove_all(&mut self) -> Result<(), Error> { + self.perform_queue_action(&["*"], QueueAction::Remove).await } } diff --git a/src/proto/single/cmd.rs b/src/proto/single/cmd.rs index 54fb7115..13efb0ab 100644 --- a/src/proto/single/cmd.rs +++ b/src/proto/single/cmd.rs @@ -279,6 +279,7 @@ impl FaktoryCommand for PushBulk { pub(crate) enum QueueAction { Pause, Resume, + Remove, } pub(crate) struct QueueControl<'a, S> @@ -298,6 +299,7 @@ where let command = match self.action { QueueAction::Pause => b"QUEUE PAUSE".as_ref(), QueueAction::Resume => b"QUEUE RESUME".as_ref(), + QueueAction::Remove => b"QUEUE REMOVE".as_ref(), }; w.write_all(command).await?; write_queues(w, self.queues).await?; diff --git a/tests/real/community.rs b/tests/real/community.rs index eabf693f..bca63e41 100644 --- a/tests/real/community.rs +++ b/tests/real/community.rs @@ -253,41 +253,192 @@ async fn fail() { } #[tokio::test(flavor = "multi_thread")] -async fn queue() { +async fn queue_control_actions() { skip_check!(); - let local = "pause"; + + let local_1 = "queue_control_pause_and_resume_1"; + let local_2 = "queue_control_pause_and_resume_2"; let (tx, rx) = sync::mpsc::channel(); - let tx = sync::Arc::new(sync::Mutex::new(tx)); + let tx_1 = sync::Arc::new(sync::Mutex::new(tx)); + let tx_2 = sync::Arc::clone(&tx_1); - let mut w = WorkerBuilder::default() + let mut worker = WorkerBuilder::default() .hostname("tester".to_string()) - .wid(WorkerId::new(local)) - .register_fn(local, move |_job| { - let tx = sync::Arc::clone(&tx); + .wid(WorkerId::new(local_1)) + .register_fn(local_1, move |_job| { + let tx = sync::Arc::clone(&tx_1); + Box::pin(async move { tx.lock().unwrap().send(true) }) + }) + .register_fn(local_2, move |_job| { + let tx = sync::Arc::clone(&tx_2); Box::pin(async move { tx.lock().unwrap().send(true) }) }) .connect(None) .await .unwrap(); - let mut p = Client::connect(None).await.unwrap(); - p.enqueue(Job::new(local, vec![Value::from(1)]).on_queue(local)) + let mut client = Client::connect(None).await.unwrap(); + + // enqueue three jobs + client + .enqueue_many([ + Job::new(local_1, vec![Value::from(1)]).on_queue(local_1), + Job::new(local_1, vec![Value::from(1)]).on_queue(local_1), + Job::new(local_1, vec![Value::from(1)]).on_queue(local_1), + ]) .await .unwrap(); - p.queue_pause(&[local]).await.unwrap(); - let had_job = w.run_one(0, &[local]).await.unwrap(); + // pause the queue + client.queue_pause(&[local_1]).await.unwrap(); + + // try to consume from that queue + let had_job = worker.run_one(0, &[local_1]).await.unwrap(); assert!(!had_job); let worker_executed = rx.try_recv().is_ok(); assert!(!worker_executed); - p.queue_resume(&[local]).await.unwrap(); + // resume that queue and ... + client.queue_resume(&[local_1]).await.unwrap(); - let had_job = w.run_one(0, &[local]).await.unwrap(); + // ... be able to consume from it + let had_job = worker.run_one(0, &[local_1]).await.unwrap(); assert!(had_job); let worker_executed = rx.try_recv().is_ok(); assert!(worker_executed); + + // push two jobs on the other queue (reminder: we got two jobs + // remaining on the first queue): + client + .enqueue_many([ + Job::new(local_2, vec![Value::from(1)]).on_queue(local_2), + Job::new(local_2, vec![Value::from(1)]).on_queue(local_2), + ]) + .await + .unwrap(); + + // pause both queues the queues + client.queue_pause(&[local_1, local_2]).await.unwrap(); + + // try to consume from them + assert!(!worker.run_one(0, &[local_1]).await.unwrap()); + assert!(!worker.run_one(0, &[local_2]).await.unwrap()); + assert!(!rx.try_recv().is_ok()); + + // now, resume the queues and ... + client.queue_resume(&[local_1, local_2]).await.unwrap(); + + // ... be able to consume from both of them + assert!(worker.run_one(0, &[local_1]).await.unwrap()); + assert!(rx.try_recv().is_ok()); + assert!(worker.run_one(0, &[local_2]).await.unwrap()); + assert!(rx.try_recv().is_ok()); + + // let's inspect the sever state + let server_state = client.info().await.unwrap(); + let queues = &server_state.get("faktory").unwrap().get("queues").unwrap(); + assert_eq!(*queues.get(local_1).unwrap(), 1); // 1 job remaining + assert_eq!(*queues.get(local_2).unwrap(), 1); // also 1 job remaining + + // let's now remove the queues + client.queue_remove(&[local_1, local_2]).await.unwrap(); + + // though there _was_ a job in each queue, consuming from + // the removed queues will not yield anything + assert!(!worker.run_one(0, &[local_1]).await.unwrap()); + assert!(!worker.run_one(0, &[local_2]).await.unwrap()); + assert!(!rx.try_recv().is_ok()); + + // let's inspect the sever state again + let server_state = client.info().await.unwrap(); + let queues = &server_state.get("faktory").unwrap().get("queues").unwrap(); + // our queue are not even mentioned in the server report: + assert!(queues.get(local_1).is_none()); + assert!(queues.get(local_2).is_none()); +} + +// Run the following test with: +// FAKTORY_URL=tcp://127.0.0.1:7419 cargo test --locked --all-features --all-targets queue_control_actions_wildcard -- --include-ignored +#[tokio::test(flavor = "multi_thread")] +#[ignore = "this test requires a dedicated test run since the commands being tested will affect all queues on the Faktory server"] +async fn queue_control_actions_wildcard() { + skip_check!(); + + let local_1 = "queue_control_wildcard_1"; + let local_2 = "queue_control_wildcard_2"; + + let (tx, rx) = sync::mpsc::channel(); + let tx_1 = sync::Arc::new(sync::Mutex::new(tx)); + let tx_2 = sync::Arc::clone(&tx_1); + + let mut worker = WorkerBuilder::default() + .hostname("tester".to_string()) + .wid(WorkerId::new(local_1)) + .register_fn(local_1, move |_job| { + let tx = sync::Arc::clone(&tx_1); + Box::pin(async move { tx.lock().unwrap().send(true) }) + }) + .register_fn(local_2, move |_job| { + let tx = sync::Arc::clone(&tx_2); + Box::pin(async move { tx.lock().unwrap().send(true) }) + }) + .connect(None) + .await + .unwrap(); + + let mut client = Client::connect(None).await.unwrap(); + + // enqueue two jobs on each queue + client + .enqueue_many([ + Job::new(local_1, vec![Value::from(1)]).on_queue(local_1), + Job::new(local_1, vec![Value::from(1)]).on_queue(local_1), + Job::new(local_2, vec![Value::from(1)]).on_queue(local_2), + Job::new(local_2, vec![Value::from(1)]).on_queue(local_2), + ]) + .await + .unwrap(); + + // pause all queues the queues + client.queue_pause_all().await.unwrap(); + + // try to consume from queues + assert!(!worker.run_one(0, &[local_1]).await.unwrap()); + assert!(!worker.run_one(0, &[local_2]).await.unwrap()); + assert!(!rx.try_recv().is_ok()); + + // now, resume all the queues and ... + client.queue_resume_all().await.unwrap(); + + // ... be able to consume from both of them + assert!(worker.run_one(0, &[local_1]).await.unwrap()); + assert!(rx.try_recv().is_ok()); + assert!(worker.run_one(0, &[local_2]).await.unwrap()); + assert!(rx.try_recv().is_ok()); + + // let's inspect the sever state + let server_state = client.info().await.unwrap(); + let queues = &server_state.get("faktory").unwrap().get("queues").unwrap(); + assert_eq!(*queues.get(local_1).unwrap(), 1); // 1 job remaining + assert_eq!(*queues.get(local_2).unwrap(), 1); // also 1 job remaining + + // let's now remove all the queues + client.queue_remove_all().await.unwrap(); + + // though there _was_ a job in each queue, consuming from + // the removed queues will not yield anything + assert!(!worker.run_one(0, &[local_1]).await.unwrap()); + assert!(!worker.run_one(0, &[local_2]).await.unwrap()); + assert!(!rx.try_recv().is_ok()); + + // let's inspect the sever state again + let server_state = client.info().await.unwrap(); + let queues = &server_state.get("faktory").unwrap().get("queues").unwrap(); + + // our queue are not even mentioned in the server report: + assert!(queues.get(local_1).is_none()); + assert!(queues.get(local_2).is_none()); } #[tokio::test(flavor = "multi_thread")]