diff --git a/fang/fang_examples/asynk/simple_async_worker/src/lib.rs b/fang/fang_examples/asynk/simple_async_worker/src/lib.rs index cfd269b0..2a203920 100644 --- a/fang/fang_examples/asynk/simple_async_worker/src/lib.rs +++ b/fang/fang_examples/asynk/simple_async_worker/src/lib.rs @@ -33,7 +33,7 @@ impl MyFailingTask { #[async_trait] #[typetag::serde] impl AsyncRunnable for MyTask { - async fn run(&self, queue: &mut dyn AsyncQueueable) -> Result<(), FangError> { + async fn run(&self, queue: &dyn AsyncQueueable) -> Result<(), FangError> { let new_task = MyTask::new(self.number + 1); queue .insert_task(&new_task as &dyn AsyncRunnable) @@ -50,7 +50,7 @@ impl AsyncRunnable for MyTask { #[async_trait] #[typetag::serde] impl AsyncRunnable for MyFailingTask { - async fn run(&self, queue: &mut dyn AsyncQueueable) -> Result<(), FangError> { + async fn run(&self, queue: &dyn AsyncQueueable) -> Result<(), FangError> { let new_task = MyFailingTask::new(self.number + 1); queue .insert_task(&new_task as &dyn AsyncRunnable) diff --git a/fang/fang_examples/asynk/simple_cron_async_worker/src/lib.rs b/fang/fang_examples/asynk/simple_cron_async_worker/src/lib.rs index 2bb972b4..2efc55ad 100644 --- a/fang/fang_examples/asynk/simple_cron_async_worker/src/lib.rs +++ b/fang/fang_examples/asynk/simple_cron_async_worker/src/lib.rs @@ -13,7 +13,7 @@ pub struct MyCronTask {} #[async_trait] #[typetag::serde] impl AsyncRunnable for MyCronTask { - async fn run(&self, _queue: &mut dyn AsyncQueueable) -> Result<(), FangError> { + async fn run(&self, _queue: &dyn AsyncQueueable) -> Result<(), FangError> { log::info!("CRON!!!!!!!!!!!!!!!",); Ok(()) diff --git a/fang/src/asynk/async_queue.rs b/fang/src/asynk/async_queue.rs index 5c76052e..c44abb0a 100644 --- a/fang/src/asynk/async_queue.rs +++ b/fang/src/asynk/async_queue.rs @@ -79,61 +79,57 @@ impl From for AsyncQueueError { /// This is implemented by the `AsyncQueue` struct which uses internally a `AnyPool` of `sqlx` to connect to the database. #[async_trait] -pub trait AsyncQueueable: Send { +pub trait AsyncQueueable: Send + Sync { /// This method should retrieve one task of the `task_type` type. If `task_type` is `None` it will try to /// fetch a task of the type `common`. After fetching it should update the state of the task to /// `FangTaskState::InProgress`. /// async fn fetch_and_touch_task( - &mut self, + &self, task_type: Option, ) -> Result, AsyncQueueError>; /// Enqueue a task to the queue, The task will be executed as soon as possible by the worker of the same type /// created by an AsyncWorkerPool. - async fn insert_task(&mut self, task: &dyn AsyncRunnable) -> Result; + async fn insert_task(&self, task: &dyn AsyncRunnable) -> Result; /// The method will remove all tasks from the queue - async fn remove_all_tasks(&mut self) -> Result; + async fn remove_all_tasks(&self) -> Result; /// Remove all tasks that are scheduled in the future. - async fn remove_all_scheduled_tasks(&mut self) -> Result; + async fn remove_all_scheduled_tasks(&self) -> Result; /// Remove a task by its id. - async fn remove_task(&mut self, id: &Uuid) -> Result; + async fn remove_task(&self, id: &Uuid) -> Result; /// Remove a task by its metadata (struct fields values) async fn remove_task_by_metadata( - &mut self, + &self, task: &dyn AsyncRunnable, ) -> Result; /// Removes all tasks that have the specified `task_type`. - async fn remove_tasks_type(&mut self, task_type: &str) -> Result; + async fn remove_tasks_type(&self, task_type: &str) -> Result; /// Retrieve a task from storage by its `id`. - async fn find_task_by_id(&mut self, id: &Uuid) -> Result; + async fn find_task_by_id(&self, id: &Uuid) -> Result; /// Update the state field of the specified task /// See the `FangTaskState` enum for possible states. async fn update_task_state( - &mut self, + &self, task: &Task, state: FangTaskState, ) -> Result; /// Update the state of a task to `FangTaskState::Failed` and set an error_message. - async fn fail_task( - &mut self, - task: &Task, - error_message: &str, - ) -> Result; + async fn fail_task(&self, task: &Task, error_message: &str) -> Result; /// Schedule a task. - async fn schedule_task(&mut self, task: &dyn AsyncRunnable) -> Result; + async fn schedule_task(&self, task: &dyn AsyncRunnable) -> Result; async fn schedule_retry( - &mut self, + &self, task: &Task, backoff_seconds: u32, error: &str, @@ -424,7 +420,7 @@ impl AsyncQueue { #[async_trait] impl AsyncQueueable for AsyncQueue { - async fn find_task_by_id(&mut self, id: &Uuid) -> Result { + async fn find_task_by_id(&self, id: &Uuid) -> Result { self.check_if_connection()?; let pool = &self.pool; @@ -441,7 +437,7 @@ impl AsyncQueueable for AsyncQueue { } async fn fetch_and_touch_task( - &mut self, + &self, task_type: Option, ) -> Result, AsyncQueueError> { self.check_if_connection()?; @@ -455,7 +451,7 @@ impl AsyncQueueable for AsyncQueue { Ok(task) } - async fn insert_task(&mut self, task: &dyn AsyncRunnable) -> Result { + async fn insert_task(&self, task: &dyn AsyncRunnable) -> Result { self.check_if_connection()?; // this unwrap is safe because we check if connection is established let pool = &self.pool; @@ -480,7 +476,7 @@ impl AsyncQueueable for AsyncQueue { Ok(task) } - async fn schedule_task(&mut self, task: &dyn AsyncRunnable) -> Result { + async fn schedule_task(&self, task: &dyn AsyncRunnable) -> Result { self.check_if_connection()?; // this unwrap is safe because we check if connection is established let pool = &self.pool; @@ -491,7 +487,7 @@ impl AsyncQueueable for AsyncQueue { Ok(task) } - async fn remove_all_tasks(&mut self) -> Result { + async fn remove_all_tasks(&self) -> Result { self.check_if_connection()?; // this unwrap is safe because we check if connection is established let pool = &self.pool; @@ -507,7 +503,7 @@ impl AsyncQueueable for AsyncQueue { Ok(result) } - async fn remove_all_scheduled_tasks(&mut self) -> Result { + async fn remove_all_scheduled_tasks(&self) -> Result { self.check_if_connection()?; // this unwrap is safe because we check if connection is established let pool = &self.pool; @@ -524,7 +520,7 @@ impl AsyncQueueable for AsyncQueue { Ok(result) } - async fn remove_task(&mut self, id: &Uuid) -> Result { + async fn remove_task(&self, id: &Uuid) -> Result { self.check_if_connection()?; let pool = &self.pool; let backend = pool.backend()?; @@ -540,7 +536,7 @@ impl AsyncQueueable for AsyncQueue { } async fn remove_task_by_metadata( - &mut self, + &self, task: &dyn AsyncRunnable, ) -> Result { if task.uniq() { @@ -561,7 +557,7 @@ impl AsyncQueueable for AsyncQueue { } } - async fn remove_tasks_type(&mut self, task_type: &str) -> Result { + async fn remove_tasks_type(&self, task_type: &str) -> Result { self.check_if_connection()?; let pool = &self.pool; let backend = pool.backend()?; @@ -577,7 +573,7 @@ impl AsyncQueueable for AsyncQueue { } async fn update_task_state( - &mut self, + &self, task: &Task, state: FangTaskState, ) -> Result { @@ -595,11 +591,7 @@ impl AsyncQueueable for AsyncQueue { Ok(task) } - async fn fail_task( - &mut self, - task: &Task, - error_message: &str, - ) -> Result { + async fn fail_task(&self, task: &Task, error_message: &str) -> Result { self.check_if_connection()?; let pool = &self.pool; let backend = pool.backend()?; @@ -618,7 +610,7 @@ impl AsyncQueueable for AsyncQueue { } async fn schedule_retry( - &mut self, + &self, task: &Task, backoff_seconds: u32, error: &str, diff --git a/fang/src/asynk/async_queue/async_queue_tests.rs b/fang/src/asynk/async_queue/async_queue_tests.rs index 62e836b3..3f60a738 100644 --- a/fang/src/asynk/async_queue/async_queue_tests.rs +++ b/fang/src/asynk/async_queue/async_queue_tests.rs @@ -15,7 +15,7 @@ pub(crate) struct AsyncTask { #[typetag::serde] #[async_trait] impl AsyncRunnable for AsyncTask { - async fn run(&self, _queueable: &mut dyn AsyncQueueable) -> Result<(), FangError> { + async fn run(&self, _queueable: &dyn AsyncQueueable) -> Result<(), FangError> { Ok(()) } } @@ -28,7 +28,7 @@ pub(crate) struct AsyncUniqTask { #[typetag::serde] #[async_trait] impl AsyncRunnable for AsyncUniqTask { - async fn run(&self, _queueable: &mut dyn AsyncQueueable) -> Result<(), FangError> { + async fn run(&self, _queueable: &dyn AsyncQueueable) -> Result<(), FangError> { Ok(()) } @@ -46,7 +46,7 @@ pub(crate) struct AsyncTaskSchedule { #[typetag::serde] #[async_trait] impl AsyncRunnable for AsyncTaskSchedule { - async fn run(&self, _queueable: &mut dyn AsyncQueueable) -> Result<(), FangError> { + async fn run(&self, _queueable: &dyn AsyncQueueable) -> Result<(), FangError> { Ok(()) } @@ -77,7 +77,7 @@ macro_rules! test_asynk_queue { #[tokio::test] async fn insert_task_creates_new_task() { - let mut test: $q = $e.await; + let test: $q = $e.await; let task = test.insert_task(&AsyncTask { number: 1 }).await.unwrap(); @@ -91,7 +91,7 @@ macro_rules! test_asynk_queue { #[tokio::test] async fn update_task_state_test() { - let mut test: $q = $e.await; + let test: $q = $e.await; let task = test.insert_task(&AsyncTask { number: 1 }).await.unwrap(); @@ -114,7 +114,7 @@ macro_rules! test_asynk_queue { #[tokio::test] async fn failed_task_test() { - let mut test: $q = $e.await; + let test: $q = $e.await; let task = test.insert_task(&AsyncTask { number: 1 }).await.unwrap(); @@ -135,7 +135,7 @@ macro_rules! test_asynk_queue { #[tokio::test] async fn remove_all_tasks_test() { - let mut test: $q = $e.await; + let test: $q = $e.await; let task = test.insert_task(&AsyncTask { number: 1 }).await.unwrap(); @@ -161,7 +161,7 @@ macro_rules! test_asynk_queue { #[tokio::test] async fn schedule_task_test() { - let mut test: $q = $e.await; + let test: $q = $e.await; let datetime = (Utc::now() + Duration::seconds(7)).round_subsecs(0); @@ -183,7 +183,7 @@ macro_rules! test_asynk_queue { #[tokio::test] async fn remove_all_scheduled_tasks_test() { - let mut test: $q = $e.await; + let test: $q = $e.await; let datetime = (Utc::now() + Duration::seconds(7)).round_subsecs(0); @@ -207,7 +207,7 @@ macro_rules! test_asynk_queue { #[tokio::test] async fn fetch_and_touch_test() { - let mut test: $q = $e.await; + let test: $q = $e.await; let task = test.insert_task(&AsyncTask { number: 1 }).await.unwrap(); @@ -247,7 +247,7 @@ macro_rules! test_asynk_queue { #[tokio::test] async fn remove_tasks_type_test() { - let mut test: $q = $e.await; + let test: $q = $e.await; let task = test.insert_task(&AsyncTask { number: 1 }).await.unwrap(); @@ -279,7 +279,7 @@ macro_rules! test_asynk_queue { #[tokio::test] async fn remove_tasks_by_metadata() { //console_subscriber::init(); - let mut test: $q = $e.await; + let test: $q = $e.await; let task = test .insert_task(&AsyncUniqTask { number: 1 }) diff --git a/fang/src/asynk/async_runnable.rs b/fang/src/asynk/async_runnable.rs index 3a73148d..bc0852b3 100644 --- a/fang/src/asynk/async_runnable.rs +++ b/fang/src/asynk/async_runnable.rs @@ -34,7 +34,7 @@ impl From for FangError { #[async_trait] pub trait AsyncRunnable: Send + Sync { /// Execute the task. This method should define its logic - async fn run(&self, client: &mut dyn AsyncQueueable) -> Result<(), FangError>; + async fn run(&self, client: &dyn AsyncQueueable) -> Result<(), FangError>; /// Define the type of the task. /// The `common` task type is used by default diff --git a/fang/src/asynk/async_worker.rs b/fang/src/asynk/async_worker.rs index 7c73227d..a0d3d420 100644 --- a/fang/src/asynk/async_worker.rs +++ b/fang/src/asynk/async_worker.rs @@ -30,7 +30,7 @@ where AQueue: AsyncQueueable + Clone + Sync + 'static, { async fn run(&mut self, task: &Task, runnable: &dyn AsyncRunnable) -> Result<(), FangError> { - let result = runnable.run(&mut self.queue).await; + let result = runnable.run(&self.queue).await; match result { Ok(_) => self.finalize_task(task, &result).await?, @@ -272,7 +272,7 @@ mod async_worker_tests { #[typetag::serde] #[async_trait] impl AsyncRunnable for WorkerAsyncTask { - async fn run(&self, _queueable: &mut dyn AsyncQueueable) -> Result<(), FangError> { + async fn run(&self, _queueable: &dyn AsyncQueueable) -> Result<(), FangError> { Ok(()) } } @@ -285,7 +285,7 @@ mod async_worker_tests { #[typetag::serde] #[async_trait] impl AsyncRunnable for WorkerAsyncTaskSchedule { - async fn run(&self, _queueable: &mut dyn AsyncQueueable) -> Result<(), FangError> { + async fn run(&self, _queueable: &dyn AsyncQueueable) -> Result<(), FangError> { Ok(()) } fn cron(&self) -> Option { @@ -299,7 +299,7 @@ mod async_worker_tests { #[typetag::serde] #[async_trait] impl AsyncRunnable for WorkerAsyncTaskScheduled { - async fn run(&self, _queueable: &mut dyn AsyncQueueable) -> Result<(), FangError> { + async fn run(&self, _queueable: &dyn AsyncQueueable) -> Result<(), FangError> { log::info!("WorkerAsyncTaskScheduled has been run"); tokio::time::sleep(std::time::Duration::from_millis(2050)).await; Ok(()) @@ -322,7 +322,7 @@ mod async_worker_tests { #[typetag::serde] #[async_trait] impl AsyncRunnable for AsyncFailedTask { - async fn run(&self, _queueable: &mut dyn AsyncQueueable) -> Result<(), FangError> { + async fn run(&self, _queueable: &dyn AsyncQueueable) -> Result<(), FangError> { let message = format!("number {} is wrong :(", self.number); Err(FangError { @@ -341,7 +341,7 @@ mod async_worker_tests { #[typetag::serde] #[async_trait] impl AsyncRunnable for AsyncRetryTask { - async fn run(&self, _queueable: &mut dyn AsyncQueueable) -> Result<(), FangError> { + async fn run(&self, _queueable: &dyn AsyncQueueable) -> Result<(), FangError> { let message = "Failed".to_string(); Err(FangError { @@ -360,7 +360,7 @@ mod async_worker_tests { #[typetag::serde] #[async_trait] impl AsyncRunnable for AsyncTaskType1 { - async fn run(&self, _queueable: &mut dyn AsyncQueueable) -> Result<(), FangError> { + async fn run(&self, _queueable: &dyn AsyncQueueable) -> Result<(), FangError> { Ok(()) } @@ -375,7 +375,7 @@ mod async_worker_tests { #[typetag::serde] #[async_trait] impl AsyncRunnable for AsyncTaskType2 { - async fn run(&self, _queueable: &mut dyn AsyncQueueable) -> Result<(), FangError> { + async fn run(&self, _queueable: &dyn AsyncQueueable) -> Result<(), FangError> { Ok(()) }