Skip to content

Commit

Permalink
improving API
Browse files Browse the repository at this point in the history
  • Loading branch information
pxp9 committed Apr 18, 2024
1 parent e7c15bb commit af8e0b4
Show file tree
Hide file tree
Showing 6 changed files with 49 additions and 57 deletions.
4 changes: 2 additions & 2 deletions fang/fang_examples/asynk/simple_async_worker/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ impl MyFailingTask {
#[async_trait]
#[typetag::serde]
impl AsyncRunnable for MyTask {
async fn run(&self, queue: &mut dyn AsyncQueueable) -> Result<(), FangError> {
async fn run(&self, queue: &dyn AsyncQueueable) -> Result<(), FangError> {
let new_task = MyTask::new(self.number + 1);
queue
.insert_task(&new_task as &dyn AsyncRunnable)
Expand All @@ -50,7 +50,7 @@ impl AsyncRunnable for MyTask {
#[async_trait]
#[typetag::serde]
impl AsyncRunnable for MyFailingTask {
async fn run(&self, queue: &mut dyn AsyncQueueable) -> Result<(), FangError> {
async fn run(&self, queue: &dyn AsyncQueueable) -> Result<(), FangError> {
let new_task = MyFailingTask::new(self.number + 1);
queue
.insert_task(&new_task as &dyn AsyncRunnable)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ pub struct MyCronTask {}
#[async_trait]
#[typetag::serde]
impl AsyncRunnable for MyCronTask {
async fn run(&self, _queue: &mut dyn AsyncQueueable) -> Result<(), FangError> {
async fn run(&self, _queue: &dyn AsyncQueueable) -> Result<(), FangError> {
log::info!("CRON!!!!!!!!!!!!!!!",);

Ok(())
Expand Down
58 changes: 25 additions & 33 deletions fang/src/asynk/async_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,61 +79,57 @@ impl From<cron::error::Error> for AsyncQueueError {
/// This is implemented by the `AsyncQueue` struct which uses internally a `AnyPool` of `sqlx` to connect to the database.
#[async_trait]
pub trait AsyncQueueable: Send {
pub trait AsyncQueueable: Send + Sync {
/// This method should retrieve one task of the `task_type` type. If `task_type` is `None` it will try to
/// fetch a task of the type `common`. After fetching it should update the state of the task to
/// `FangTaskState::InProgress`.
///
async fn fetch_and_touch_task(
&mut self,
&self,
task_type: Option<String>,
) -> Result<Option<Task>, AsyncQueueError>;

/// Enqueue a task to the queue, The task will be executed as soon as possible by the worker of the same type
/// created by an AsyncWorkerPool.
async fn insert_task(&mut self, task: &dyn AsyncRunnable) -> Result<Task, AsyncQueueError>;
async fn insert_task(&self, task: &dyn AsyncRunnable) -> Result<Task, AsyncQueueError>;

/// The method will remove all tasks from the queue
async fn remove_all_tasks(&mut self) -> Result<u64, AsyncQueueError>;
async fn remove_all_tasks(&self) -> Result<u64, AsyncQueueError>;

/// Remove all tasks that are scheduled in the future.
async fn remove_all_scheduled_tasks(&mut self) -> Result<u64, AsyncQueueError>;
async fn remove_all_scheduled_tasks(&self) -> Result<u64, AsyncQueueError>;

/// Remove a task by its id.
async fn remove_task(&mut self, id: &Uuid) -> Result<u64, AsyncQueueError>;
async fn remove_task(&self, id: &Uuid) -> Result<u64, AsyncQueueError>;

/// Remove a task by its metadata (struct fields values)
async fn remove_task_by_metadata(
&mut self,
&self,
task: &dyn AsyncRunnable,
) -> Result<u64, AsyncQueueError>;

/// Removes all tasks that have the specified `task_type`.
async fn remove_tasks_type(&mut self, task_type: &str) -> Result<u64, AsyncQueueError>;
async fn remove_tasks_type(&self, task_type: &str) -> Result<u64, AsyncQueueError>;

/// Retrieve a task from storage by its `id`.
async fn find_task_by_id(&mut self, id: &Uuid) -> Result<Task, AsyncQueueError>;
async fn find_task_by_id(&self, id: &Uuid) -> Result<Task, AsyncQueueError>;

/// Update the state field of the specified task
/// See the `FangTaskState` enum for possible states.
async fn update_task_state(
&mut self,
&self,
task: &Task,
state: FangTaskState,
) -> Result<Task, AsyncQueueError>;

/// Update the state of a task to `FangTaskState::Failed` and set an error_message.
async fn fail_task(
&mut self,
task: &Task,
error_message: &str,
) -> Result<Task, AsyncQueueError>;
async fn fail_task(&self, task: &Task, error_message: &str) -> Result<Task, AsyncQueueError>;

/// Schedule a task.
async fn schedule_task(&mut self, task: &dyn AsyncRunnable) -> Result<Task, AsyncQueueError>;
async fn schedule_task(&self, task: &dyn AsyncRunnable) -> Result<Task, AsyncQueueError>;

async fn schedule_retry(
&mut self,
&self,
task: &Task,
backoff_seconds: u32,
error: &str,
Expand Down Expand Up @@ -424,7 +420,7 @@ impl AsyncQueue {

#[async_trait]
impl AsyncQueueable for AsyncQueue {
async fn find_task_by_id(&mut self, id: &Uuid) -> Result<Task, AsyncQueueError> {
async fn find_task_by_id(&self, id: &Uuid) -> Result<Task, AsyncQueueError> {
self.check_if_connection()?;
let pool = &self.pool;

Expand All @@ -441,7 +437,7 @@ impl AsyncQueueable for AsyncQueue {
}

async fn fetch_and_touch_task(
&mut self,
&self,
task_type: Option<String>,
) -> Result<Option<Task>, AsyncQueueError> {
self.check_if_connection()?;
Expand All @@ -455,7 +451,7 @@ impl AsyncQueueable for AsyncQueue {
Ok(task)
}

async fn insert_task(&mut self, task: &dyn AsyncRunnable) -> Result<Task, AsyncQueueError> {
async fn insert_task(&self, task: &dyn AsyncRunnable) -> Result<Task, AsyncQueueError> {
self.check_if_connection()?;
// this unwrap is safe because we check if connection is established
let pool = &self.pool;
Expand All @@ -480,7 +476,7 @@ impl AsyncQueueable for AsyncQueue {
Ok(task)
}

async fn schedule_task(&mut self, task: &dyn AsyncRunnable) -> Result<Task, AsyncQueueError> {
async fn schedule_task(&self, task: &dyn AsyncRunnable) -> Result<Task, AsyncQueueError> {
self.check_if_connection()?;
// this unwrap is safe because we check if connection is established
let pool = &self.pool;
Expand All @@ -491,7 +487,7 @@ impl AsyncQueueable for AsyncQueue {
Ok(task)
}

async fn remove_all_tasks(&mut self) -> Result<u64, AsyncQueueError> {
async fn remove_all_tasks(&self) -> Result<u64, AsyncQueueError> {
self.check_if_connection()?;
// this unwrap is safe because we check if connection is established
let pool = &self.pool;
Expand All @@ -507,7 +503,7 @@ impl AsyncQueueable for AsyncQueue {
Ok(result)
}

async fn remove_all_scheduled_tasks(&mut self) -> Result<u64, AsyncQueueError> {
async fn remove_all_scheduled_tasks(&self) -> Result<u64, AsyncQueueError> {
self.check_if_connection()?;
// this unwrap is safe because we check if connection is established
let pool = &self.pool;
Expand All @@ -524,7 +520,7 @@ impl AsyncQueueable for AsyncQueue {
Ok(result)
}

async fn remove_task(&mut self, id: &Uuid) -> Result<u64, AsyncQueueError> {
async fn remove_task(&self, id: &Uuid) -> Result<u64, AsyncQueueError> {
self.check_if_connection()?;
let pool = &self.pool;
let backend = pool.backend()?;
Expand All @@ -540,7 +536,7 @@ impl AsyncQueueable for AsyncQueue {
}

async fn remove_task_by_metadata(
&mut self,
&self,
task: &dyn AsyncRunnable,
) -> Result<u64, AsyncQueueError> {
if task.uniq() {
Expand All @@ -561,7 +557,7 @@ impl AsyncQueueable for AsyncQueue {
}
}

async fn remove_tasks_type(&mut self, task_type: &str) -> Result<u64, AsyncQueueError> {
async fn remove_tasks_type(&self, task_type: &str) -> Result<u64, AsyncQueueError> {
self.check_if_connection()?;
let pool = &self.pool;
let backend = pool.backend()?;
Expand All @@ -577,7 +573,7 @@ impl AsyncQueueable for AsyncQueue {
}

async fn update_task_state(
&mut self,
&self,
task: &Task,
state: FangTaskState,
) -> Result<Task, AsyncQueueError> {
Expand All @@ -595,11 +591,7 @@ impl AsyncQueueable for AsyncQueue {
Ok(task)
}

async fn fail_task(
&mut self,
task: &Task,
error_message: &str,
) -> Result<Task, AsyncQueueError> {
async fn fail_task(&self, task: &Task, error_message: &str) -> Result<Task, AsyncQueueError> {
self.check_if_connection()?;
let pool = &self.pool;
let backend = pool.backend()?;
Expand All @@ -618,7 +610,7 @@ impl AsyncQueueable for AsyncQueue {
}

async fn schedule_retry(
&mut self,
&self,
task: &Task,
backoff_seconds: u32,
error: &str,
Expand Down
24 changes: 12 additions & 12 deletions fang/src/asynk/async_queue/async_queue_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ pub(crate) struct AsyncTask {
#[typetag::serde]
#[async_trait]
impl AsyncRunnable for AsyncTask {
async fn run(&self, _queueable: &mut dyn AsyncQueueable) -> Result<(), FangError> {
async fn run(&self, _queueable: &dyn AsyncQueueable) -> Result<(), FangError> {
Ok(())
}
}
Expand All @@ -28,7 +28,7 @@ pub(crate) struct AsyncUniqTask {
#[typetag::serde]
#[async_trait]
impl AsyncRunnable for AsyncUniqTask {
async fn run(&self, _queueable: &mut dyn AsyncQueueable) -> Result<(), FangError> {
async fn run(&self, _queueable: &dyn AsyncQueueable) -> Result<(), FangError> {
Ok(())
}

Expand All @@ -46,7 +46,7 @@ pub(crate) struct AsyncTaskSchedule {
#[typetag::serde]
#[async_trait]
impl AsyncRunnable for AsyncTaskSchedule {
async fn run(&self, _queueable: &mut dyn AsyncQueueable) -> Result<(), FangError> {
async fn run(&self, _queueable: &dyn AsyncQueueable) -> Result<(), FangError> {
Ok(())
}

Expand Down Expand Up @@ -77,7 +77,7 @@ macro_rules! test_asynk_queue {

#[tokio::test]
async fn insert_task_creates_new_task() {
let mut test: $q = $e.await;
let test: $q = $e.await;

let task = test.insert_task(&AsyncTask { number: 1 }).await.unwrap();

Expand All @@ -91,7 +91,7 @@ macro_rules! test_asynk_queue {

#[tokio::test]
async fn update_task_state_test() {
let mut test: $q = $e.await;
let test: $q = $e.await;

let task = test.insert_task(&AsyncTask { number: 1 }).await.unwrap();

Expand All @@ -114,7 +114,7 @@ macro_rules! test_asynk_queue {

#[tokio::test]
async fn failed_task_test() {
let mut test: $q = $e.await;
let test: $q = $e.await;

let task = test.insert_task(&AsyncTask { number: 1 }).await.unwrap();

Expand All @@ -135,7 +135,7 @@ macro_rules! test_asynk_queue {

#[tokio::test]
async fn remove_all_tasks_test() {
let mut test: $q = $e.await;
let test: $q = $e.await;

let task = test.insert_task(&AsyncTask { number: 1 }).await.unwrap();

Expand All @@ -161,7 +161,7 @@ macro_rules! test_asynk_queue {

#[tokio::test]
async fn schedule_task_test() {
let mut test: $q = $e.await;
let test: $q = $e.await;

let datetime = (Utc::now() + Duration::seconds(7)).round_subsecs(0);

Expand All @@ -183,7 +183,7 @@ macro_rules! test_asynk_queue {

#[tokio::test]
async fn remove_all_scheduled_tasks_test() {
let mut test: $q = $e.await;
let test: $q = $e.await;

let datetime = (Utc::now() + Duration::seconds(7)).round_subsecs(0);

Expand All @@ -207,7 +207,7 @@ macro_rules! test_asynk_queue {

#[tokio::test]
async fn fetch_and_touch_test() {
let mut test: $q = $e.await;
let test: $q = $e.await;

let task = test.insert_task(&AsyncTask { number: 1 }).await.unwrap();

Expand Down Expand Up @@ -247,7 +247,7 @@ macro_rules! test_asynk_queue {

#[tokio::test]
async fn remove_tasks_type_test() {
let mut test: $q = $e.await;
let test: $q = $e.await;

let task = test.insert_task(&AsyncTask { number: 1 }).await.unwrap();

Expand Down Expand Up @@ -279,7 +279,7 @@ macro_rules! test_asynk_queue {
#[tokio::test]
async fn remove_tasks_by_metadata() {
//console_subscriber::init();
let mut test: $q = $e.await;
let test: $q = $e.await;

let task = test
.insert_task(&AsyncUniqTask { number: 1 })
Expand Down
2 changes: 1 addition & 1 deletion fang/src/asynk/async_runnable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ impl From<SerdeError> for FangError {
#[async_trait]
pub trait AsyncRunnable: Send + Sync {
/// Execute the task. This method should define its logic
async fn run(&self, client: &mut dyn AsyncQueueable) -> Result<(), FangError>;
async fn run(&self, client: &dyn AsyncQueueable) -> Result<(), FangError>;

/// Define the type of the task.
/// The `common` task type is used by default
Expand Down
Loading

0 comments on commit af8e0b4

Please sign in to comment.