Skip to content

Commit

Permalink
remove backend field from AsyncQueue
Browse files Browse the repository at this point in the history
  • Loading branch information
Dopplerian committed Apr 15, 2024
1 parent 890531a commit fe69c66
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 27 deletions.
58 changes: 33 additions & 25 deletions fang/src/asynk/async_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,17 @@ impl InternalPool {
_ => panic!("Not a SqlitePool!"),
}
}

pub(crate) fn backend(&self) -> BackendSqlX {
match *self {
#[cfg(feature = "asynk-postgres")]
InternalPool::Pg(_) => BackendSqlX::Pg,
#[cfg(feature = "asynk-mysql")]
InternalPool::MySql(_) => BackendSqlX::MySql,
#[cfg(feature = "asynk-sqlite")]
InternalPool::Sqlite(_) => BackendSqlX::Sqlite,
}
}
}

#[derive(TypedBuilder, Debug, Clone)]
Expand All @@ -203,8 +214,6 @@ pub struct AsyncQueue {
max_pool_size: u32,
#[builder(default = false, setter(skip))]
connected: bool,
#[builder(default = BackendSqlX::NoBackend, setter(skip))]
backend: BackendSqlX,
}

#[cfg(test)]
Expand Down Expand Up @@ -287,7 +296,6 @@ impl AsyncQueue {
let (backend, pool) = get_backend(kind, &self.uri, self.max_pool_size).await?;

self.pool = Some(pool);
self.backend = backend;
self.connected = true;
Ok(())
}
Expand Down Expand Up @@ -420,8 +428,8 @@ impl AsyncQueueable for AsyncQueue {

let query_params = QueryParams::builder().uuid(id).build();

let task = self
.backend
let task = pool
.backend()
.execute_query(SqlXQuery::FindTaskById, pool, query_params)
.await?
.unwrap_task();
Expand All @@ -437,7 +445,7 @@ impl AsyncQueueable for AsyncQueue {
// this unwrap is safe because we check if connection is established
let pool = self.pool.as_ref().unwrap();

let task = Self::fetch_and_touch_task_query(pool, &self.backend, task_type).await?;
let task = Self::fetch_and_touch_task_query(pool, &pool.backend(), task_type).await?;

Ok(task)
}
Expand All @@ -451,7 +459,7 @@ impl AsyncQueueable for AsyncQueue {
let task = if !task.uniq() {
Self::insert_task_query(
pool,
&self.backend,
&pool.backend(),
&metadata,
&task.task_type(),
&Utc::now(),
Expand All @@ -460,7 +468,7 @@ impl AsyncQueueable for AsyncQueue {
} else {
Self::insert_task_if_not_exist_query(
pool,
&self.backend,
&pool.backend(),
&metadata,
&task.task_type(),
&Utc::now(),
Expand All @@ -476,7 +484,7 @@ impl AsyncQueueable for AsyncQueue {
// this unwrap is safe because we check if connection is established
let pool = self.pool.as_ref().unwrap();

let task = Self::schedule_task_query(pool, &self.backend, task).await?;
let task = Self::schedule_task_query(pool, &pool.backend(), task).await?;

Ok(task)
}
Expand All @@ -488,8 +496,8 @@ impl AsyncQueueable for AsyncQueue {

let query_params = QueryParams::builder().build();

let result = self
.backend
let result = pool
.backend()
.execute_query(SqlXQuery::RemoveAllTask, pool, query_params)
.await?
.unwrap_u64();
Expand All @@ -504,8 +512,8 @@ impl AsyncQueueable for AsyncQueue {

let query_params = QueryParams::builder().build();

let result = self
.backend
let result = pool
.backend()
.execute_query(SqlXQuery::RemoveAllScheduledTask, pool, query_params)
.await?
.unwrap_u64();
Expand All @@ -519,8 +527,8 @@ impl AsyncQueueable for AsyncQueue {

let query_params = QueryParams::builder().uuid(id).build();

let result = self
.backend
let result = pool
.backend()
.execute_query(SqlXQuery::RemoveTask, pool, query_params)
.await?
.unwrap_u64();
Expand All @@ -538,8 +546,8 @@ impl AsyncQueueable for AsyncQueue {

let query_params = QueryParams::builder().runnable(task).build();

let result = self
.backend
let result = pool
.backend()
.execute_query(SqlXQuery::RemoveTaskByMetadata, pool, query_params)
.await?
.unwrap_u64();
Expand All @@ -556,8 +564,8 @@ impl AsyncQueueable for AsyncQueue {

let query_params = QueryParams::builder().task_type(task_type).build();

let result = self
.backend
let result = pool
.backend()
.execute_query(SqlXQuery::RemoveTaskType, pool, query_params)
.await?
.unwrap_u64();
Expand All @@ -575,8 +583,8 @@ impl AsyncQueueable for AsyncQueue {

let query_params = QueryParams::builder().uuid(&task.id).state(state).build();

let task = self
.backend
let task = pool
.backend()
.execute_query(SqlXQuery::UpdateTaskState, pool, query_params)
.await?
.unwrap_task();
Expand All @@ -597,8 +605,8 @@ impl AsyncQueueable for AsyncQueue {
.task(task)
.build();

let failed_task = self
.backend
let failed_task = pool
.backend()
.execute_query(SqlXQuery::FailTask, pool, query_params)
.await?
.unwrap_task();
Expand All @@ -622,8 +630,8 @@ impl AsyncQueueable for AsyncQueue {
.task(task)
.build();

let failed_task = self
.backend
let failed_task = pool
.backend()
.execute_query(SqlXQuery::RetryTask, pool, query_params)
.await?
.unwrap_task();
Expand Down
2 changes: 0 additions & 2 deletions fang/src/asynk/backend_sqlx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,6 @@ pub(crate) enum BackendSqlX {

#[cfg(feature = "asynk-mysql")]
MySql,

NoBackend,
}

#[allow(dead_code)]
Expand Down

0 comments on commit fe69c66

Please sign in to comment.