From 7ec0e799d301897ea8c6444c91cf3b84c38a372e Mon Sep 17 00:00:00 2001 From: Pmarquez <48651252+pxp9@users.noreply.github.com> Date: Mon, 15 Apr 2024 09:30:51 +0200 Subject: [PATCH] Fix/independent decoding (#149) * independent decoding * Postgres and Sqlite passing * fix unreachable pattern warn * delete vscode stuff * I think this may be a Rust compiler issue, but I fixed it so I will take it * fix clippy --- fang/src/asynk/async_queue.rs | 167 ++++- .../asynk/async_queue/async_queue_tests.rs | 2 +- fang/src/asynk/backend_sqlx.rs | 575 ++++++++------- fang/src/asynk/backend_sqlx/mysql.rs | 671 +++++++++++------- fang/src/asynk/backend_sqlx/postgres.rs | 174 +++-- fang/src/asynk/backend_sqlx/sqlite.rs | 177 +++-- 6 files changed, 1112 insertions(+), 654 deletions(-) diff --git a/fang/src/asynk/async_queue.rs b/fang/src/asynk/async_queue.rs index a9c1b18..013670a 100644 --- a/fang/src/asynk/async_queue.rs +++ b/fang/src/asynk/async_queue.rs @@ -13,17 +13,35 @@ use async_trait::async_trait; use chrono::DateTime; use chrono::Utc; use cron::Schedule; -//use sqlx::any::install_default_drivers; // this is supported in sqlx 0.7 +use sqlx::any::AnyConnectOptions; use sqlx::any::AnyKind; +#[cfg(any( + feature = "asynk-postgres", + feature = "asynk-mysql", + feature = "asynk-sqlite" +))] use sqlx::pool::PoolOptions; -use sqlx::Any; -use sqlx::AnyPool; -use sqlx::Pool; +//use sqlx::any::install_default_drivers; // this is supported in sqlx 0.7 use std::str::FromStr; use thiserror::Error; use typed_builder::TypedBuilder; use uuid::Uuid; +#[cfg(feature = "asynk-postgres")] +use sqlx::PgPool; +#[cfg(feature = "asynk-postgres")] +use sqlx::Postgres; + +#[cfg(feature = "asynk-mysql")] +use sqlx::MySql; +#[cfg(feature = "asynk-mysql")] +use sqlx::MySqlPool; + +#[cfg(feature = "asynk-sqlite")] +use sqlx::Sqlite; +#[cfg(feature = "asynk-sqlite")] +use sqlx::SqlitePool; + #[cfg(test)] use self::async_queue_tests::test_asynk_queue; @@ -134,11 +152,51 @@ pub trait AsyncQueueable: Send { /// .build(); /// ``` /// +/// + +#[derive(Debug, Clone)] +pub(crate) enum InternalPool { + #[cfg(feature = "asynk-postgres")] + Pg(PgPool), + #[cfg(feature = "asynk-mysql")] + MySql(MySqlPool), + #[cfg(feature = "asynk-sqlite")] + Sqlite(SqlitePool), +} + +impl InternalPool { + #[cfg(feature = "asynk-postgres")] + pub(crate) fn unwrap_pg_pool(&self) -> &PgPool { + match self { + InternalPool::Pg(pool) => pool, + #[allow(unreachable_patterns)] + _ => panic!("Not a PgPool!"), + } + } + + #[cfg(feature = "asynk-mysql")] + pub(crate) fn unwrap_mysql_pool(&self) -> &MySqlPool { + match self { + InternalPool::MySql(pool) => pool, + #[allow(unreachable_patterns)] + _ => panic!("Not a MySqlPool!"), + } + } + + #[cfg(feature = "asynk-sqlite")] + pub(crate) fn unwrap_sqlite_pool(&self) -> &SqlitePool { + match self { + InternalPool::Sqlite(pool) => pool, + #[allow(unreachable_patterns)] + _ => panic!("Not a SqlitePool!"), + } + } +} #[derive(TypedBuilder, Debug, Clone)] pub struct AsyncQueue { #[builder(default=None, setter(skip))] - pool: Option, + pool: Option, #[builder(setter(into))] uri: String, #[builder(setter(into))] @@ -152,19 +210,19 @@ pub struct AsyncQueue { #[cfg(test)] use tokio::sync::Mutex; -#[cfg(test)] +#[cfg(all(test, feature = "asynk-postgres"))] static ASYNC_QUEUE_POSTGRES_TEST_COUNTER: Mutex = Mutex::const_new(0); -#[cfg(test)] +#[cfg(all(test, feature = "asynk-sqlite"))] static ASYNC_QUEUE_SQLITE_TEST_COUNTER: Mutex = Mutex::const_new(0); -#[cfg(test)] +#[cfg(all(test, feature = "asynk-mysql"))] static ASYNC_QUEUE_MYSQL_TEST_COUNTER: Mutex = Mutex::const_new(0); #[cfg(test)] use sqlx::Executor; -#[cfg(test)] +#[cfg(all(test, feature = "asynk-sqlite"))] use std::path::Path; #[cfg(test)] @@ -172,16 +230,41 @@ use std::env; use super::backend_sqlx::BackendSqlX; -fn get_backend(_anykind: AnyKind) -> BackendSqlX { - match _anykind { +async fn get_backend( + kind: AnyKind, + _uri: &str, + _max_connections: u32, +) -> Result<(BackendSqlX, InternalPool), AsyncQueueError> { + match kind { #[cfg(feature = "asynk-postgres")] - AnyKind::Postgres => BackendSqlX::Pg, + AnyKind::Postgres => { + let pool = PoolOptions::::new() + .max_connections(_max_connections) + .connect(_uri) + .await?; + + Ok((BackendSqlX::Pg, InternalPool::Pg(pool))) + } #[cfg(feature = "asynk-mysql")] - AnyKind::MySql => BackendSqlX::MySql, + AnyKind::MySql => { + let pool = PoolOptions::::new() + .max_connections(_max_connections) + .connect(_uri) + .await?; + + Ok((BackendSqlX::MySql, InternalPool::MySql(pool))) + } #[cfg(feature = "asynk-sqlite")] - AnyKind::Sqlite => BackendSqlX::Sqlite, + AnyKind::Sqlite => { + let pool = PoolOptions::::new() + .max_connections(_max_connections) + .connect(_uri) + .await?; + + Ok((BackendSqlX::Sqlite, InternalPool::Sqlite(pool))) + } #[allow(unreachable_patterns)] - _ => unreachable!(), + _ => panic!("Not a valid backend"), } } @@ -199,22 +282,18 @@ impl AsyncQueue { pub async fn connect(&mut self) -> Result<(), AsyncQueueError> { //install_default_drivers(); - let pool: AnyPool = PoolOptions::new() - .max_connections(self.max_pool_size) - .connect(&self.uri) - .await?; + let kind: AnyKind = self.uri.parse::()?.kind(); - let anykind = pool.any_kind(); - - self.backend = get_backend(anykind); + let (backend, pool) = get_backend(kind, &self.uri, self.max_pool_size).await?; self.pool = Some(pool); + self.backend = backend; self.connected = true; Ok(()) } async fn fetch_and_touch_task_query( - pool: &Pool, + pool: &InternalPool, backend: &BackendSqlX, task_type: Option, ) -> Result, AsyncQueueError> { @@ -250,7 +329,7 @@ impl AsyncQueue { } async fn insert_task_query( - pool: &Pool, + pool: &InternalPool, backend: &BackendSqlX, metadata: &serde_json::Value, task_type: &str, @@ -271,7 +350,7 @@ impl AsyncQueue { } async fn insert_task_if_not_exist_query( - pool: &Pool, + pool: &InternalPool, backend: &BackendSqlX, metadata: &serde_json::Value, task_type: &str, @@ -292,7 +371,7 @@ impl AsyncQueue { } async fn schedule_task_query( - pool: &Pool, + pool: &InternalPool, backend: &BackendSqlX, task: &dyn AsyncRunnable, ) -> Result { @@ -553,7 +632,7 @@ impl AsyncQueueable for AsyncQueue { } } -#[cfg(test)] +#[cfg(all(test, feature = "asynk-postgres"))] impl AsyncQueue { /// Provides an AsyncQueue connected to its own DB pub async fn test_postgres() -> Self { @@ -575,7 +654,14 @@ impl AsyncQueue { let create_query: &str = &format!("CREATE DATABASE {} WITH TEMPLATE fang;", db_name); let delete_query: &str = &format!("DROP DATABASE IF EXISTS {};", db_name); - let mut conn = res.pool.as_mut().unwrap().acquire().await.unwrap(); + let mut conn = res + .pool + .as_mut() + .unwrap() + .unwrap_pg_pool() + .acquire() + .await + .unwrap(); log::info!("Deleting database {db_name} ..."); conn.execute(delete_query).await.unwrap(); @@ -600,7 +686,10 @@ impl AsyncQueue { res } +} +#[cfg(all(test, feature = "asynk-sqlite"))] +impl AsyncQueue { /// Provides an AsyncQueue connected to its own DB pub async fn test_sqlite() -> Self { dotenvy::dotenv().expect(".env file not found"); @@ -632,7 +721,10 @@ impl AsyncQueue { res.connect().await.expect("fail to connect"); res } +} +#[cfg(all(test, feature = "asynk-mysql"))] +impl AsyncQueue { /// Provides an AsyncQueue connected to its own DB pub async fn test_mysql() -> Self { dotenvy::dotenv().expect(".env file not found"); @@ -657,7 +749,14 @@ impl AsyncQueue { let delete_query: &str = &format!("DROP DATABASE IF EXISTS {};", db_name); - let mut conn = res.pool.as_mut().unwrap().acquire().await.unwrap(); + let mut conn = res + .pool + .as_mut() + .unwrap() + .unwrap_mysql_pool() + .acquire() + .await + .unwrap(); log::info!("Deleting database {db_name} ..."); conn.execute(delete_query).await.unwrap(); @@ -684,11 +783,11 @@ impl AsyncQueue { } } -#[cfg(test)] -test_asynk_queue! {postgres, crate::AsyncQueue, crate::AsyncQueue::test_postgres()} +#[cfg(all(test, feature = "asynk-postgres"))] +test_asynk_queue! {postgres, crate::AsyncQueue,crate::AsyncQueue::test_postgres()} -#[cfg(test)] -test_asynk_queue! {sqlite, crate::AsyncQueue, crate::AsyncQueue::test_sqlite()} +#[cfg(all(test, feature = "asynk-sqlite"))] +test_asynk_queue! {sqlite, crate::AsyncQueue,crate::AsyncQueue::test_sqlite()} -#[cfg(test)] +#[cfg(all(test, feature = "asynk-mysql"))] test_asynk_queue! {mysql, crate::AsyncQueue, crate::AsyncQueue::test_mysql()} diff --git a/fang/src/asynk/async_queue/async_queue_tests.rs b/fang/src/asynk/async_queue/async_queue_tests.rs index fa9e42b..62e836b 100644 --- a/fang/src/asynk/async_queue/async_queue_tests.rs +++ b/fang/src/asynk/async_queue/async_queue_tests.rs @@ -113,7 +113,7 @@ macro_rules! test_asynk_queue { } #[tokio::test] - async fn failed_task_query_test() { + async fn failed_task_test() { let mut test: $q = $e.await; let task = test.insert_task(&AsyncTask { number: 1 }).await.unwrap(); diff --git a/fang/src/asynk/backend_sqlx.rs b/fang/src/asynk/backend_sqlx.rs index 0206553..10e4dae 100644 --- a/fang/src/asynk/backend_sqlx.rs +++ b/fang/src/asynk/backend_sqlx.rs @@ -1,8 +1,15 @@ use chrono::{DateTime, Duration, Utc}; use sha2::Digest; use sha2::Sha256; -use sqlx::Any; +use sqlx::any::AnyQueryResult; +use sqlx::database::HasArguments; +use sqlx::Database; +use sqlx::Encode; +use sqlx::Executor; +use sqlx::FromRow; +use sqlx::IntoArguments; use sqlx::Pool; +use sqlx::Type; use std::fmt::Debug; use typed_builder::TypedBuilder; use uuid::Uuid; @@ -35,6 +42,7 @@ pub(crate) enum BackendSqlX { NoBackend, } +#[allow(dead_code)] #[derive(TypedBuilder, Clone)] pub(crate) struct QueryParams<'a> { #[builder(default, setter(strip_option))] @@ -80,19 +88,25 @@ impl Res { } impl BackendSqlX { - pub(crate) async fn execute_query<'a>( + pub(crate) async fn execute_query( &self, _query: SqlXQuery, - _pool: &Pool, + _pool: &InternalPool, _params: QueryParams<'_>, ) -> Result { match self { #[cfg(feature = "asynk-postgres")] - BackendSqlX::Pg => BackendSqlXPg::execute_query(_query, _pool, _params).await, + BackendSqlX::Pg => { + BackendSqlXPg::execute_query(_query, _pool.unwrap_pg_pool(), _params).await + } #[cfg(feature = "asynk-sqlite")] - BackendSqlX::Sqlite => BackendSqlXSQLite::execute_query(_query, _pool, _params).await, + BackendSqlX::Sqlite => { + BackendSqlXSQLite::execute_query(_query, _pool.unwrap_sqlite_pool(), _params).await + } #[cfg(feature = "asynk-mysql")] - BackendSqlX::MySql => BackendSqlXMySQL::execute_query(_query, _pool, _params).await, + BackendSqlX::MySql => { + BackendSqlXMySQL::execute_query(_query, _pool.unwrap_mysql_pool(), _params).await + } _ => unreachable!(), } } @@ -133,46 +147,9 @@ pub(crate) enum SqlXQuery { use crate::AsyncQueueError; use crate::AsyncRunnable; use crate::FangTaskState; +use crate::InternalPool; use crate::Task; -#[allow(dead_code)] -async fn general_any_impl_insert_task_if_not_exists( - queries: (&str, &str), - pool: &Pool, - params: QueryParams<'_>, -) -> Result { - match general_any_impl_find_task_by_uniq_hash(queries.0, pool, ¶ms).await { - Some(task) => Ok(task), - None => general_any_impl_insert_task_uniq(queries.1, pool, params).await, - } -} - -#[allow(dead_code)] -async fn general_any_impl_insert_task( - query: &str, - pool: &Pool, - params: QueryParams<'_>, -) -> Result { - let uuid = Uuid::new_v4(); - let mut buffer = Uuid::encode_buffer(); - let uuid_as_str: &str = uuid.as_hyphenated().encode_lower(&mut buffer); - - let scheduled_at_str = format!("{}", params.scheduled_at.unwrap().format("%F %T%.f+00")); - - let metadata_str = params.metadata.unwrap().to_string(); - let task_type = params.task_type.unwrap(); - - let task: Task = sqlx::query_as(query) - .bind(uuid_as_str) - .bind(metadata_str) - .bind(task_type) - .bind(scheduled_at_str) - .fetch_one(pool) - .await?; - - Ok(task) -} - #[allow(dead_code)] pub(crate) fn calculate_hash(json: &str) -> String { let mut hasher = Sha256::new(); @@ -181,258 +158,304 @@ pub(crate) fn calculate_hash(json: &str) -> String { hex::encode(result) } -#[allow(dead_code)] -async fn general_any_impl_insert_task_uniq( - query: &str, - pool: &Pool, - params: QueryParams<'_>, -) -> Result { - let uuid = Uuid::new_v4(); - let mut buffer = Uuid::encode_buffer(); - let uuid_as_str: &str = uuid.as_hyphenated().encode_lower(&mut buffer); - - let metadata = params.metadata.unwrap(); - - let metadata_str = metadata.to_string(); - let scheduled_at_str = format!("{}", params.scheduled_at.unwrap().format("%F %T%.f+00")); - - let task_type = params.task_type.unwrap(); - - let uniq_hash = calculate_hash(&metadata_str); - - let task: Task = sqlx::query_as(query) - .bind(uuid_as_str) - .bind(metadata_str) - .bind(task_type) - .bind(uniq_hash) - .bind(scheduled_at_str) - .fetch_one(pool) - .await?; - Ok(task) -} +trait FangQueryable +where + DB: Database, + for<'r> Task: FromRow<'r, ::Row>, + for<'r> std::string::String: Encode<'r, DB> + Type, + for<'r> &'r str: Encode<'r, DB> + Type, + for<'r> i32: Encode<'r, DB> + Type, + for<'r> &'r Pool: Executor<'r, Database = DB>, + for<'r> >::Arguments: IntoArguments<'r, DB>, + ::QueryResult: Into, +{ + async fn fetch_task_type( + query: &str, + pool: &Pool, + params: QueryParams<'_>, + ) -> Result { + let task_type = params.task_type.unwrap(); + + let now_str = format!("{}", Utc::now().format("%F %T%.f+00")); + + let task: Task = sqlx::query_as(query) + .bind(task_type) + .bind(now_str) + .fetch_one(pool) + .await?; + + Ok(task) + } -#[allow(dead_code)] -async fn general_any_impl_update_task_state( - query: &str, - pool: &Pool, - params: QueryParams<'_>, -) -> Result { - let updated_at_str = format!("{}", Utc::now().format("%F %T%.f+00")); + async fn find_task_by_uniq_hash( + query: &str, + pool: &Pool, + params: &QueryParams<'_>, + ) -> Option { + let metadata = params.metadata.unwrap(); - let state_str: &str = params.state.unwrap().into(); + let uniq_hash = calculate_hash(&metadata.to_string()); - let uuid = params.uuid.unwrap(); + sqlx::query_as(query) + .bind(uniq_hash) + .fetch_one(pool) + .await + .ok() + } - let mut buffer = Uuid::encode_buffer(); - let uuid_as_text = uuid.as_hyphenated().encode_lower(&mut buffer); + async fn find_task_by_id( + query: &str, + pool: &Pool, + params: QueryParams<'_>, + ) -> Result { + let mut buffer = Uuid::encode_buffer(); + let uuid_as_text = params + .uuid + .unwrap() + .as_hyphenated() + .encode_lower(&mut buffer); + + let task: Task = sqlx::query_as(query) + .bind(&*uuid_as_text) + .fetch_one(pool) + .await?; + + Ok(task) + } - let task: Task = sqlx::query_as(query) - .bind(state_str) - .bind(updated_at_str) - .bind(&*uuid_as_text) - .fetch_one(pool) - .await?; + async fn retry_task( + query: &str, + pool: &Pool, + params: QueryParams<'_>, + ) -> Result { + let now = Utc::now(); + let now_str = format!("{}", now.format("%F %T%.f+00")); + + let scheduled_at = now + Duration::seconds(params.backoff_seconds.unwrap() as i64); + let scheduled_at_str = format!("{}", scheduled_at.format("%F %T%.f+00")); + let retries = params.task.unwrap().retries + 1; + + let mut buffer = Uuid::encode_buffer(); + let uuid_as_text = params + .task + .unwrap() + .id + .as_hyphenated() + .encode_lower(&mut buffer); + + let error = params.error_message.unwrap(); + + let failed_task: Task = sqlx::query_as(query) + .bind(error) + .bind(retries) + .bind(scheduled_at_str) + .bind(now_str) + .bind(&*uuid_as_text) + .fetch_one(pool) + .await?; + + Ok(failed_task) + } - Ok(task) -} + async fn insert_task_uniq( + query: &str, + pool: &Pool, + params: QueryParams<'_>, + ) -> Result { + let uuid = Uuid::new_v4(); + let mut buffer = Uuid::encode_buffer(); + let uuid_as_str: &str = uuid.as_hyphenated().encode_lower(&mut buffer); + + let metadata = params.metadata.unwrap(); + + let metadata_str = metadata.to_string(); + let scheduled_at_str = format!("{}", params.scheduled_at.unwrap().format("%F %T%.f+00")); + + let task_type = params.task_type.unwrap(); + + let uniq_hash = calculate_hash(&metadata_str); + + let task: Task = sqlx::query_as(query) + .bind(uuid_as_str) + .bind(metadata_str) + .bind(task_type) + .bind(uniq_hash) + .bind(scheduled_at_str) + .fetch_one(pool) + .await?; + Ok(task) + } -#[allow(dead_code)] -async fn general_any_impl_fail_task( - query: &str, - pool: &Pool, - params: QueryParams<'_>, -) -> Result { - let updated_at = format!("{}", Utc::now().format("%F %T%.f+00")); + async fn insert_task( + query: &str, + pool: &Pool, + params: QueryParams<'_>, + ) -> Result { + let uuid = Uuid::new_v4(); + let mut buffer = Uuid::encode_buffer(); + let uuid_as_str: &str = uuid.as_hyphenated().encode_lower(&mut buffer); + + let scheduled_at_str = format!("{}", params.scheduled_at.unwrap().format("%F %T%.f+00")); + + let metadata_str = params.metadata.unwrap().to_string(); + let task_type = params.task_type.unwrap(); + + let task: Task = sqlx::query_as(query) + .bind(uuid_as_str) + .bind(metadata_str) + .bind(task_type) + .bind(scheduled_at_str) + .fetch_one(pool) + .await?; + + Ok(task) + } - let id = params.task.unwrap().id; + async fn update_task_state( + query: &str, + pool: &Pool, + params: QueryParams<'_>, + ) -> Result { + let updated_at_str = format!("{}", Utc::now().format("%F %T%.f+00")); - let mut buffer = Uuid::encode_buffer(); - let uuid_as_text = id.as_hyphenated().encode_lower(&mut buffer); + let state_str: &str = params.state.unwrap().into(); - let error_message = params.error_message.unwrap(); + let uuid = params.uuid.unwrap(); - let failed_task: Task = sqlx::query_as(query) - .bind(<&str>::from(FangTaskState::Failed)) - .bind(error_message) - .bind(updated_at) - .bind(&*uuid_as_text) - .fetch_one(pool) - .await?; + let mut buffer = Uuid::encode_buffer(); + let uuid_as_text = uuid.as_hyphenated().encode_lower(&mut buffer); - Ok(failed_task) -} + let task: Task = sqlx::query_as(query) + .bind(state_str) + .bind(updated_at_str) + .bind(&*uuid_as_text) + .fetch_one(pool) + .await?; -#[allow(dead_code)] -async fn general_any_impl_remove_all_task( - query: &str, - pool: &Pool, -) -> Result { - Ok(sqlx::query(query).execute(pool).await?.rows_affected()) -} + Ok(task) + } -#[allow(dead_code)] -async fn general_any_impl_remove_all_scheduled_tasks( - query: &str, - pool: &Pool, -) -> Result { - let now_str = format!("{}", Utc::now().format("%F %T%.f+00")); - - Ok(sqlx::query(query) - .bind(now_str) - .execute(pool) - .await? - .rows_affected()) -} + async fn fail_task( + query: &str, + pool: &Pool, + params: QueryParams<'_>, + ) -> Result { + let updated_at = format!("{}", Utc::now().format("%F %T%.f+00")); -#[allow(dead_code)] -async fn general_any_impl_remove_task( - query: &str, - pool: &Pool, - params: QueryParams<'_>, -) -> Result { - let mut buffer = Uuid::encode_buffer(); - let uuid_as_text = params - .uuid - .unwrap() - .as_hyphenated() - .encode_lower(&mut buffer); - - let result = sqlx::query(query) - .bind(&*uuid_as_text) - .execute(pool) - .await? - .rows_affected(); - - if result != 1 { - Err(AsyncQueueError::ResultError { - expected: 1, - found: result, - }) - } else { - Ok(result) + let id = params.task.unwrap().id; + + let mut buffer = Uuid::encode_buffer(); + let uuid_as_text = id.as_hyphenated().encode_lower(&mut buffer); + + let error_message = params.error_message.unwrap(); + + let failed_task: Task = sqlx::query_as(query) + .bind(<&str>::from(FangTaskState::Failed)) + .bind(error_message) + .bind(updated_at) + .bind(&*uuid_as_text) + .fetch_one(pool) + .await?; + + Ok(failed_task) } -} -#[allow(dead_code)] -async fn general_any_impl_remove_task_by_metadata( - query: &str, - pool: &Pool, - params: QueryParams<'_>, -) -> Result { - let metadata = serde_json::to_value(params.runnable.unwrap())?; - - let uniq_hash = calculate_hash(&metadata.to_string()); - - Ok(sqlx::query(query) - .bind(uniq_hash) - .execute(pool) - .await? - .rows_affected()) -} + async fn remove_all_task(query: &str, pool: &Pool) -> Result { + // This converts QueryResult to AnyQueryResult and then to u64 + // do not delete into() method and do not delete Into trait bound + Ok(sqlx::query(query) + .execute(pool) + .await? + .into() + .rows_affected()) + } -#[allow(dead_code)] -async fn general_any_impl_remove_task_type( - query: &str, - pool: &Pool, - params: QueryParams<'_>, -) -> Result { - let task_type = params.task_type.unwrap(); - - Ok(sqlx::query(query) - .bind(task_type) - .execute(pool) - .await? - .rows_affected()) -} + async fn remove_all_scheduled_tasks( + query: &str, + pool: &Pool, + ) -> Result { + let now_str = format!("{}", Utc::now().format("%F %T%.f+00")); + + // This converts QueryResult to AnyQueryResult and then to u64 + // do not delete into() method and do not delete Into trait bound + + Ok(sqlx::query(query) + .bind(now_str) + .execute(pool) + .await? + .into() + .rows_affected()) + } -#[allow(dead_code)] -async fn general_any_impl_fetch_task_type( - query: &str, - pool: &Pool, - params: QueryParams<'_>, -) -> Result { - let task_type = params.task_type.unwrap(); - - let now_str = format!("{}", Utc::now().format("%F %T%.f+00")); - - let task: Task = sqlx::query_as(query) - .bind(task_type) - .bind(now_str) - .fetch_one(pool) - .await?; - - Ok(task) -} + async fn remove_task( + query: &str, + pool: &Pool, + params: QueryParams<'_>, + ) -> Result { + let mut buffer = Uuid::encode_buffer(); + let uuid_as_text = params + .uuid + .unwrap() + .as_hyphenated() + .encode_lower(&mut buffer); + + let result = sqlx::query(query) + .bind(&*uuid_as_text) + .execute(pool) + .await? + .into() + .rows_affected(); + + if result != 1 { + Err(AsyncQueueError::ResultError { + expected: 1, + found: result, + }) + } else { + Ok(result) + } + } -#[allow(dead_code)] -async fn general_any_impl_find_task_by_uniq_hash( - query: &str, - pool: &Pool, - params: &QueryParams<'_>, -) -> Option { - let metadata = params.metadata.unwrap(); - - let uniq_hash = calculate_hash(&metadata.to_string()); - - sqlx::query_as(query) - .bind(uniq_hash) - .fetch_one(pool) - .await - .ok() -} + async fn remove_task_by_metadata( + query: &str, + pool: &Pool, + params: QueryParams<'_>, + ) -> Result { + let metadata = serde_json::to_value(params.runnable.unwrap())?; + + let uniq_hash = calculate_hash(&metadata.to_string()); + + Ok(sqlx::query(query) + .bind(uniq_hash) + .execute(pool) + .await? + .into() + .rows_affected()) + } -#[allow(dead_code)] -async fn general_any_impl_find_task_by_id( - query: &str, - pool: &Pool, - params: QueryParams<'_>, -) -> Result { - let mut buffer = Uuid::encode_buffer(); - let uuid_as_text = params - .uuid - .unwrap() - .as_hyphenated() - .encode_lower(&mut buffer); - - let task: Task = sqlx::query_as(query) - .bind(&*uuid_as_text) - .fetch_one(pool) - .await?; - - Ok(task) -} + async fn remove_task_type( + query: &str, + pool: &Pool, + params: QueryParams<'_>, + ) -> Result { + let task_type = params.task_type.unwrap(); + + Ok(sqlx::query(query) + .bind(task_type) + .execute(pool) + .await? + .into() + .rows_affected()) + } -#[allow(dead_code)] -async fn general_any_impl_retry_task( - query: &str, - pool: &Pool, - params: QueryParams<'_>, -) -> Result { - let now = Utc::now(); - let now_str = format!("{}", now.format("%F %T%.f+00")); - - let scheduled_at = now + Duration::seconds(params.backoff_seconds.unwrap() as i64); - let scheduled_at_str = format!("{}", scheduled_at.format("%F %T%.f+00")); - let retries = params.task.unwrap().retries + 1; - - let mut buffer = Uuid::encode_buffer(); - let uuid_as_text = params - .task - .unwrap() - .id - .as_hyphenated() - .encode_lower(&mut buffer); - - let error = params.error_message.unwrap(); - - let failed_task: Task = sqlx::query_as(query) - .bind(error) - .bind(retries) - .bind(scheduled_at_str) - .bind(now_str) - .bind(&*uuid_as_text) - .fetch_one(pool) - .await?; - - Ok(failed_task) + async fn insert_task_if_not_exists( + queries: (&str, &str), + pool: &Pool, + params: QueryParams<'_>, + ) -> Result { + match Self::find_task_by_uniq_hash(queries.0, pool, ¶ms).await { + Some(task) => Ok(task), + None => Self::insert_task_uniq(queries.1, pool, params).await, + } + } } diff --git a/fang/src/asynk/backend_sqlx/mysql.rs b/fang/src/asynk/backend_sqlx/mysql.rs index 6c49815..f7c6044 100644 --- a/fang/src/asynk/backend_sqlx/mysql.rs +++ b/fang/src/asynk/backend_sqlx/mysql.rs @@ -15,61 +15,399 @@ const FIND_TASK_BY_UNIQ_HASH_QUERY_MYSQL: &str = const FIND_TASK_BY_ID_QUERY_MYSQL: &str = include_str!("../queries_mysql/find_task_by_id.sql"); const RETRY_TASK_QUERY_MYSQL: &str = include_str!("../queries_mysql/retry_task.sql"); -use super::general_any_impl_fetch_task_type; -use super::general_any_impl_find_task_by_id; -use super::general_any_impl_find_task_by_uniq_hash; -use super::general_any_impl_remove_all_scheduled_tasks; -use super::general_any_impl_remove_all_task; -use super::general_any_impl_remove_task; -use super::general_any_impl_remove_task_by_metadata; -use super::general_any_impl_remove_task_type; -use super::{calculate_hash, QueryParams, Res, SqlXQuery}; -use crate::{AsyncQueueError, FangTaskState, Task}; -use SqlXQuery as Q; - use chrono::Duration; -use chrono::Utc; -use sqlx::{Any, Pool}; +use chrono::{DateTime, Utc}; +use sqlx::mysql::MySqlQueryResult; +use sqlx::mysql::MySqlRow; +use sqlx::FromRow; +use sqlx::MySql; +use sqlx::Pool; +use sqlx::Row; use uuid::Uuid; +use SqlXQuery as Q; + +use super::FangQueryable; +use super::{calculate_hash, QueryParams, Res, SqlXQuery}; +use crate::{AsyncQueueError, FangTaskState, Task}; #[derive(Debug, Clone)] -pub(crate) struct BackendSqlXMySQL {} +pub(super) struct BackendSqlXMySQL {} + +impl<'a> FromRow<'a, MySqlRow> for Task { + fn from_row(row: &'a MySqlRow) -> Result { + let uuid_as_text: &str = row.get("id"); + + let id = Uuid::parse_str(uuid_as_text).unwrap(); + + let raw: &str = row.get("metadata"); // will work if database cast json to string + let raw = raw.replace('\\', ""); + + // -- SELECT metadata->>'type' FROM fang_tasks ; this works because jsonb casting + let metadata: serde_json::Value = serde_json::from_str(&raw).unwrap(); + + // Be careful with this if we update sqlx, https://github.com/launchbadge/sqlx/issues/2416 + let error_message: Option = row.get("error_message"); + + let state_str: &str = row.get("state"); // will work if database cast json to string + + let state: FangTaskState = state_str.into(); + + let task_type: String = row.get("task_type"); + + // Be careful with this if we update sqlx, https://github.com/launchbadge/sqlx/issues/2416 + let uniq_hash: Option = row.get("uniq_hash"); + + let retries: i32 = row.get("retries"); + + let scheduled_at_str: &str = row.get("scheduled_at"); + + // This unwrap is safe because we know that the database returns the date in the correct format + let scheduled_at: DateTime = DateTime::parse_from_str(scheduled_at_str, "%F %T%.f%#z") + .unwrap() + .into(); + + let created_at_str: &str = row.get("created_at"); + + // This unwrap is safe because we know that the database returns the date in the correct format + let created_at: DateTime = DateTime::parse_from_str(created_at_str, "%F %T%.f%#z") + .unwrap() + .into(); + + let updated_at_str: &str = row.get("updated_at"); + + // This unwrap is safe because we know that the database returns the date in the correct format + let updated_at: DateTime = DateTime::parse_from_str(updated_at_str, "%F %T%.f%#z") + .unwrap() + .into(); + + Ok(Task::builder() + .id(id) + .metadata(metadata) + .error_message(error_message) + .state(state) + .task_type(task_type) + .uniq_hash(uniq_hash) + .retries(retries) + .scheduled_at(scheduled_at) + .created_at(created_at) + .updated_at(updated_at) + .build()) + } +} + +impl FangQueryable for BackendSqlXMySQL { + async fn insert_task( + query: &str, + pool: &Pool, + params: QueryParams<'_>, + ) -> Result { + let uuid = Uuid::new_v4(); + let mut buffer = Uuid::encode_buffer(); + let uuid_as_str: &str = uuid.as_hyphenated().encode_lower(&mut buffer); + + let scheduled_at_str = format!("{}", params.scheduled_at.unwrap().format("%F %T%.f+00")); + + let metadata_str = params.metadata.unwrap().to_string(); + let task_type = params.task_type.unwrap(); + + let affected_rows = Into::::into( + sqlx::query(query) + .bind(uuid_as_str) + .bind(metadata_str) + .bind(task_type) + .bind(scheduled_at_str) + .execute(pool) + .await?, + ) + .rows_affected(); + + if affected_rows != 1 { + return Err(AsyncQueueError::ResultError { + expected: 1, + found: affected_rows, + }); + } + + let query_params = QueryParams::builder().uuid(&uuid).build(); + + let task: Task = >::find_task_by_id( + FIND_TASK_BY_ID_QUERY_MYSQL, + pool, + query_params, + ) + .await?; + + Ok(task) + } + + async fn update_task_state( + query: &str, + pool: &Pool, + params: QueryParams<'_>, + ) -> Result { + let updated_at_str = format!("{}", Utc::now().format("%F %T%.f+00")); + + let state_str: &str = params.state.unwrap().into(); + + let uuid = params.uuid.unwrap(); + + let mut buffer = Uuid::encode_buffer(); + let uuid_as_text = uuid.as_hyphenated().encode_lower(&mut buffer); + + let affected_rows = Into::::into( + sqlx::query(query) + .bind(state_str) + .bind(updated_at_str) + .bind(&*uuid_as_text) + .execute(pool) + .await?, + ) + .rows_affected(); + + if affected_rows != 1 { + return Err(AsyncQueueError::ResultError { + expected: 1, + found: affected_rows, + }); + } + + let query_params = QueryParams::builder().uuid(params.uuid.unwrap()).build(); + + let task: Task = >::find_task_by_id( + FIND_TASK_BY_ID_QUERY_MYSQL, + pool, + query_params, + ) + .await?; + + Ok(task) + } + + async fn insert_task_uniq( + query: &str, + pool: &Pool, + params: QueryParams<'_>, + ) -> Result { + let uuid = Uuid::new_v4(); + let mut buffer = Uuid::encode_buffer(); + let uuid_as_str: &str = uuid.as_hyphenated().encode_lower(&mut buffer); + + let metadata = params.metadata.unwrap(); + + let metadata_str = metadata.to_string(); + let scheduled_at_str = format!("{}", params.scheduled_at.unwrap().format("%F %T%.f+00")); + + let task_type = params.task_type.unwrap(); + + let uniq_hash = calculate_hash(&metadata_str); + + let affected_rows = Into::::into( + sqlx::query(query) + .bind(uuid_as_str) + .bind(metadata_str) + .bind(task_type) + .bind(uniq_hash) + .bind(scheduled_at_str) + .execute(pool) + .await?, + ) + .rows_affected(); + + if affected_rows != 1 { + return Err(AsyncQueueError::ResultError { + expected: 1, + found: affected_rows, + }); + } + + let query_params = QueryParams::builder().uuid(&uuid).build(); + + let task: Task = >::find_task_by_id( + FIND_TASK_BY_ID_QUERY_MYSQL, + pool, + query_params, + ) + .await?; + + Ok(task) + } + + async fn fail_task( + query: &str, + pool: &Pool, + params: QueryParams<'_>, + ) -> Result { + let updated_at = format!("{}", Utc::now().format("%F %T%.f+00")); + + let id = params.task.unwrap().id; + + let mut buffer = Uuid::encode_buffer(); + let uuid_as_text = id.as_hyphenated().encode_lower(&mut buffer); + + let error_message = params.error_message.unwrap(); + + let affected_rows = Into::::into( + sqlx::query(query) + .bind(<&str>::from(FangTaskState::Failed)) + .bind(error_message) + .bind(updated_at) + .bind(&*uuid_as_text) + .execute(pool) + .await?, + ) + .rows_affected(); + + if affected_rows != 1 { + return Err(AsyncQueueError::ResultError { + expected: 1, + found: affected_rows, + }); + } + + let query_params = QueryParams::builder().uuid(&id).build(); + + let failed_task: Task = >::find_task_by_id( + FIND_TASK_BY_ID_QUERY_MYSQL, + pool, + query_params, + ) + .await?; + + Ok(failed_task) + } + + async fn retry_task( + query: &str, + pool: &Pool, + params: QueryParams<'_>, + ) -> Result { + let now = Utc::now(); + let now_str = format!("{}", now.format("%F %T%.f+00")); + + let scheduled_at = now + Duration::seconds(params.backoff_seconds.unwrap() as i64); + let scheduled_at_str = format!("{}", scheduled_at.format("%F %T%.f+00")); + let retries = params.task.unwrap().retries + 1; + + let uuid = params.task.unwrap().id; + + let mut buffer = Uuid::encode_buffer(); + let uuid_as_text = uuid.as_hyphenated().encode_lower(&mut buffer); + + let error = params.error_message.unwrap(); + + let affected_rows = Into::::into( + sqlx::query(query) + .bind(error) + .bind(retries) + .bind(scheduled_at_str) + .bind(now_str) + .bind(&*uuid_as_text) + .execute(pool) + .await?, + ) + .rows_affected(); + + if affected_rows != 1 { + return Err(AsyncQueueError::ResultError { + expected: 1, + found: affected_rows, + }); + } + + let query_params = QueryParams::builder().uuid(&uuid).build(); + + let failed_task: Task = >::find_task_by_id( + FIND_TASK_BY_ID_QUERY_MYSQL, + pool, + query_params, + ) + .await?; + + Ok(failed_task) + } + + async fn insert_task_if_not_exists( + queries: (&str, &str), + pool: &Pool, + params: QueryParams<'_>, + ) -> Result { + match >::find_task_by_uniq_hash( + queries.0, pool, ¶ms, + ) + .await + { + Some(task) => Ok(task), + None => { + >::insert_task_uniq( + queries.1, pool, params, + ) + .await + } + } + } + + async fn find_task_by_id( + query: &str, + pool: &Pool, + params: QueryParams<'_>, + ) -> Result { + let mut buffer = Uuid::encode_buffer(); + let uuid_as_text = params + .uuid + .unwrap() + .as_hyphenated() + .encode_lower(&mut buffer); + + let task: Task = sqlx::query_as(query) + .bind(&*uuid_as_text) + .fetch_one(pool) + .await?; + + Ok(task) + } +} impl BackendSqlXMySQL { pub(super) async fn execute_query( query: SqlXQuery, - pool: &Pool, + pool: &Pool, params: QueryParams<'_>, ) -> Result { match query { Q::InsertTask => { - let task = mysql_impl_insert_task(INSERT_TASK_QUERY_MYSQL, pool, params).await?; + let task = >::insert_task( + INSERT_TASK_QUERY_MYSQL, + pool, + params, + ) + .await?; Ok(Res::Task(task)) } Q::UpdateTaskState => { - let task = - mysql_impl_update_task_state(UPDATE_TASK_STATE_QUERY_MYSQL, pool, params) - .await?; + let task = >::update_task_state( + UPDATE_TASK_STATE_QUERY_MYSQL, + pool, + params, + ) + .await?; Ok(Res::Task(task)) } Q::FailTask => { - let task = mysql_impl_fail_task(FAIL_TASK_QUERY_MYSQL, pool, params).await?; + let task = >::fail_task( + FAIL_TASK_QUERY_MYSQL, + pool, + params, + ) + .await?; Ok(Res::Task(task)) } Q::RemoveAllTask => { - let affected_rows = - general_any_impl_remove_all_task(REMOVE_ALL_TASK_QUERY_MYSQL, pool).await?; - - Ok(Res::Bigint(affected_rows)) - } - - Q::RemoveAllScheduledTask => { - let affected_rows = general_any_impl_remove_all_scheduled_tasks( - REMOVE_ALL_SCHEDULED_TASK_QUERY_MYSQL, + let affected_rows = >::remove_all_task( + REMOVE_ALL_TASK_QUERY_MYSQL, pool, ) .await?; @@ -77,15 +415,20 @@ impl BackendSqlXMySQL { Ok(Res::Bigint(affected_rows)) } - Q::RemoveTask => { + Q::RemoveAllScheduledTask => { let affected_rows = - general_any_impl_remove_task(REMOVE_TASK_QUERY_MYSQL, pool, params).await?; + >::remove_all_scheduled_tasks( + REMOVE_ALL_SCHEDULED_TASK_QUERY_MYSQL, + pool, + ) + .await?; Ok(Res::Bigint(affected_rows)) } - Q::RemoveTaskByMetadata => { - let affected_rows = general_any_impl_remove_task_by_metadata( - REMOVE_TASK_BY_METADATA_QUERY_MYSQL, + + Q::RemoveTask => { + let affected_rows = >::remove_task( + REMOVE_TASK_QUERY_MYSQL, pool, params, ) @@ -93,33 +436,58 @@ impl BackendSqlXMySQL { Ok(Res::Bigint(affected_rows)) } - Q::RemoveTaskType => { + Q::RemoveTaskByMetadata => { let affected_rows = - general_any_impl_remove_task_type(REMOVE_TASKS_TYPE_QUERY_MYSQL, pool, params) - .await?; + >::remove_task_by_metadata( + REMOVE_TASK_BY_METADATA_QUERY_MYSQL, + pool, + params, + ) + .await?; + + Ok(Res::Bigint(affected_rows)) + } + Q::RemoveTaskType => { + let affected_rows = >::remove_task_type( + REMOVE_TASKS_TYPE_QUERY_MYSQL, + pool, + params, + ) + .await?; Ok(Res::Bigint(affected_rows)) } Q::FetchTaskType => { - let task = - general_any_impl_fetch_task_type(FETCH_TASK_TYPE_QUERY_MYSQL, pool, params) - .await?; + let task = >::fetch_task_type( + FETCH_TASK_TYPE_QUERY_MYSQL, + pool, + params, + ) + .await?; Ok(Res::Task(task)) } Q::FindTaskById => { - let task: Task = - general_any_impl_find_task_by_id(FIND_TASK_BY_ID_QUERY_MYSQL, pool, params) - .await?; + let task: Task = >::find_task_by_id( + FIND_TASK_BY_ID_QUERY_MYSQL, + pool, + params, + ) + .await?; Ok(Res::Task(task)) } Q::RetryTask => { - let task = mysql_impl_retry_task(RETRY_TASK_QUERY_MYSQL, pool, params).await?; + let task = >::retry_task( + RETRY_TASK_QUERY_MYSQL, + pool, + params, + ) + .await?; Ok(Res::Task(task)) } Q::InsertTaskIfNotExists => { - let task = mysql_any_impl_insert_task_if_not_exists( + let task = >::insert_task_if_not_exists( ( FIND_TASK_BY_UNIQ_HASH_QUERY_MYSQL, INSERT_TASK_UNIQ_QUERY_MYSQL, @@ -138,214 +506,3 @@ impl BackendSqlXMySQL { "MySQL" } } - -async fn mysql_impl_insert_task( - query: &str, - pool: &Pool, - params: QueryParams<'_>, -) -> Result { - let uuid = Uuid::new_v4(); - let mut buffer = Uuid::encode_buffer(); - let uuid_as_str: &str = uuid.as_hyphenated().encode_lower(&mut buffer); - - let scheduled_at_str = format!("{}", params.scheduled_at.unwrap().format("%F %T%.f+00")); - - let metadata_str = params.metadata.unwrap().to_string(); - let task_type = params.task_type.unwrap(); - - let affected_rows = sqlx::query(query) - .bind(uuid_as_str) - .bind(metadata_str) - .bind(task_type) - .bind(scheduled_at_str) - .execute(pool) - .await? - .rows_affected(); - - if affected_rows != 1 { - return Err(AsyncQueueError::ResultError { - expected: 1, - found: affected_rows, - }); - } - - let query_params = QueryParams::builder().uuid(&uuid).build(); - - let task: Task = - general_any_impl_find_task_by_id(FIND_TASK_BY_ID_QUERY_MYSQL, pool, query_params).await?; - - Ok(task) -} - -async fn mysql_impl_insert_task_uniq( - query: &str, - pool: &Pool, - params: QueryParams<'_>, -) -> Result { - let uuid = Uuid::new_v4(); - let mut buffer = Uuid::encode_buffer(); - let uuid_as_str: &str = uuid.as_hyphenated().encode_lower(&mut buffer); - - let metadata = params.metadata.unwrap(); - - let metadata_str = metadata.to_string(); - let scheduled_at_str = format!("{}", params.scheduled_at.unwrap().format("%F %T%.f+00")); - - let task_type = params.task_type.unwrap(); - - let uniq_hash = calculate_hash(&metadata_str); - - let affected_rows = sqlx::query(query) - .bind(uuid_as_str) - .bind(metadata_str) - .bind(task_type) - .bind(uniq_hash) - .bind(scheduled_at_str) - .execute(pool) - .await? - .rows_affected(); - - if affected_rows != 1 { - return Err(AsyncQueueError::ResultError { - expected: 1, - found: affected_rows, - }); - } - - let query_params = QueryParams::builder().uuid(&uuid).build(); - - let task: Task = - general_any_impl_find_task_by_id(FIND_TASK_BY_ID_QUERY_MYSQL, pool, query_params).await?; - - Ok(task) -} - -async fn mysql_impl_update_task_state( - query: &str, - pool: &Pool, - params: QueryParams<'_>, -) -> Result { - let updated_at_str = format!("{}", Utc::now().format("%F %T%.f+00")); - - let state_str: &str = params.state.unwrap().into(); - - let uuid = params.uuid.unwrap(); - - let mut buffer = Uuid::encode_buffer(); - let uuid_as_text = uuid.as_hyphenated().encode_lower(&mut buffer); - - let affected_rows = sqlx::query(query) - .bind(state_str) - .bind(updated_at_str) - .bind(&*uuid_as_text) - .execute(pool) - .await? - .rows_affected(); - - if affected_rows != 1 { - return Err(AsyncQueueError::ResultError { - expected: 1, - found: affected_rows, - }); - } - - let query_params = QueryParams::builder().uuid(params.uuid.unwrap()).build(); - - let task: Task = - general_any_impl_find_task_by_id(FIND_TASK_BY_ID_QUERY_MYSQL, pool, query_params).await?; - - Ok(task) -} - -async fn mysql_impl_fail_task( - query: &str, - pool: &Pool, - params: QueryParams<'_>, -) -> Result { - let updated_at = format!("{}", Utc::now().format("%F %T%.f+00")); - - let id = params.task.unwrap().id; - - let mut buffer = Uuid::encode_buffer(); - let uuid_as_text = id.as_hyphenated().encode_lower(&mut buffer); - - let error_message = params.error_message.unwrap(); - - let affected_rows = sqlx::query(query) - .bind(<&str>::from(FangTaskState::Failed)) - .bind(error_message) - .bind(updated_at) - .bind(&*uuid_as_text) - .execute(pool) - .await? - .rows_affected(); - - if affected_rows != 1 { - return Err(AsyncQueueError::ResultError { - expected: 1, - found: affected_rows, - }); - } - - let query_params = QueryParams::builder().uuid(&id).build(); - - let failed_task: Task = - general_any_impl_find_task_by_id(FIND_TASK_BY_ID_QUERY_MYSQL, pool, query_params).await?; - - Ok(failed_task) -} - -async fn mysql_impl_retry_task( - query: &str, - pool: &Pool, - params: QueryParams<'_>, -) -> Result { - let now = Utc::now(); - let now_str = format!("{}", now.format("%F %T%.f+00")); - - let scheduled_at = now + Duration::seconds(params.backoff_seconds.unwrap() as i64); - let scheduled_at_str = format!("{}", scheduled_at.format("%F %T%.f+00")); - let retries = params.task.unwrap().retries + 1; - - let uuid = params.task.unwrap().id; - - let mut buffer = Uuid::encode_buffer(); - let uuid_as_text = uuid.as_hyphenated().encode_lower(&mut buffer); - - let error = params.error_message.unwrap(); - - let affected_rows = sqlx::query(query) - .bind(error) - .bind(retries) - .bind(scheduled_at_str) - .bind(now_str) - .bind(&*uuid_as_text) - .execute(pool) - .await? - .rows_affected(); - - if affected_rows != 1 { - return Err(AsyncQueueError::ResultError { - expected: 1, - found: affected_rows, - }); - } - - let query_params = QueryParams::builder().uuid(&uuid).build(); - - let failed_task: Task = - general_any_impl_find_task_by_id(FIND_TASK_BY_ID_QUERY_MYSQL, pool, query_params).await?; - - Ok(failed_task) -} - -async fn mysql_any_impl_insert_task_if_not_exists( - queries: (&str, &str), - pool: &Pool, - params: QueryParams<'_>, -) -> Result { - match general_any_impl_find_task_by_uniq_hash(queries.0, pool, ¶ms).await { - Some(task) => Ok(task), - None => mysql_impl_insert_task_uniq(queries.1, pool, params).await, - } -} diff --git a/fang/src/asynk/backend_sqlx/postgres.rs b/fang/src/asynk/backend_sqlx/postgres.rs index a5185a1..5afe304 100644 --- a/fang/src/asynk/backend_sqlx/postgres.rs +++ b/fang/src/asynk/backend_sqlx/postgres.rs @@ -24,40 +24,105 @@ const RETRY_TASK_QUERY_POSTGRES: &str = include_str!("../queries_postgres/retry_ #[derive(Debug, Clone)] pub(super) struct BackendSqlXPg {} +use chrono::DateTime; +use chrono::Utc; +use sqlx::postgres::PgRow; +use sqlx::FromRow; +use sqlx::Pool; +use sqlx::Postgres; +use sqlx::Row; +use uuid::Uuid; use SqlXQuery as Q; +use super::FangQueryable; +use super::{QueryParams, Res, SqlXQuery}; use crate::AsyncQueueError; +use crate::FangTaskState; +use crate::Task; -use super::general_any_impl_fail_task; -use super::general_any_impl_fetch_task_type; -use super::general_any_impl_find_task_by_id; -use super::general_any_impl_insert_task; -use super::general_any_impl_insert_task_if_not_exists; -use super::general_any_impl_remove_all_scheduled_tasks; -use super::general_any_impl_remove_all_task; -use super::general_any_impl_remove_task; -use super::general_any_impl_remove_task_by_metadata; -use super::general_any_impl_remove_task_type; -use super::general_any_impl_retry_task; -use super::general_any_impl_update_task_state; -use super::{QueryParams, Res, SqlXQuery}; -use sqlx::{Any, Pool}; +impl<'a> FromRow<'a, PgRow> for Task { + fn from_row(row: &'a PgRow) -> Result { + let uuid_as_text: &str = row.get("id"); + + let id = Uuid::parse_str(uuid_as_text).unwrap(); + + let raw: &str = row.get("metadata"); // will work if database cast json to string + let raw = raw.replace('\\', ""); + + // -- SELECT metadata->>'type' FROM fang_tasks ; this works because jsonb casting + let metadata: serde_json::Value = serde_json::from_str(&raw).unwrap(); + + // Be careful with this if we update sqlx, https://github.com/launchbadge/sqlx/issues/2416 + let error_message: Option = row.get("error_message"); + + let state_str: &str = row.get("state"); // will work if database cast json to string + + let state: FangTaskState = state_str.into(); + + let task_type: String = row.get("task_type"); + + // Be careful with this if we update sqlx, https://github.com/launchbadge/sqlx/issues/2416 + let uniq_hash: Option = row.get("uniq_hash"); + + let retries: i32 = row.get("retries"); + + let scheduled_at_str: &str = row.get("scheduled_at"); + + // This unwrap is safe because we know that the database returns the date in the correct format + let scheduled_at: DateTime = DateTime::parse_from_str(scheduled_at_str, "%F %T%.f%#z") + .unwrap() + .into(); + + let created_at_str: &str = row.get("created_at"); + + // This unwrap is safe because we know that the database returns the date in the correct format + let created_at: DateTime = DateTime::parse_from_str(created_at_str, "%F %T%.f%#z") + .unwrap() + .into(); + + let updated_at_str: &str = row.get("updated_at"); + + // This unwrap is safe because we know that the database returns the date in the correct format + let updated_at: DateTime = DateTime::parse_from_str(updated_at_str, "%F %T%.f%#z") + .unwrap() + .into(); + + Ok(Task::builder() + .id(id) + .metadata(metadata) + .error_message(error_message) + .state(state) + .task_type(task_type) + .uniq_hash(uniq_hash) + .retries(retries) + .scheduled_at(scheduled_at) + .created_at(created_at) + .updated_at(updated_at) + .build()) + } +} + +impl FangQueryable for BackendSqlXPg {} impl BackendSqlXPg { pub(super) async fn execute_query( query: SqlXQuery, - pool: &Pool, + pool: &Pool, params: QueryParams<'_>, ) -> Result { match query { Q::InsertTask => { - let task = - general_any_impl_insert_task(INSERT_TASK_QUERY_POSTGRES, pool, params).await?; + let task = >::insert_task( + INSERT_TASK_QUERY_POSTGRES, + pool, + params, + ) + .await?; Ok(Res::Task(task)) } Q::UpdateTaskState => { - let task = general_any_impl_update_task_state( + let task = >::update_task_state( UPDATE_TASK_STATE_QUERY_POSTGRES, pool, params, @@ -66,35 +131,37 @@ impl BackendSqlXPg { Ok(Res::Task(task)) } Q::FailTask => { - let task = - general_any_impl_fail_task(FAIL_TASK_QUERY_POSTGRES, pool, params).await?; + let task = >::fail_task( + FAIL_TASK_QUERY_POSTGRES, + pool, + params, + ) + .await?; Ok(Res::Task(task)) } Q::RemoveAllTask => { - let affected_rows = - general_any_impl_remove_all_task(REMOVE_ALL_TASK_QUERY_POSTGRES, pool).await?; - - Ok(Res::Bigint(affected_rows)) - } - Q::RemoveAllScheduledTask => { - let affected_rows = general_any_impl_remove_all_scheduled_tasks( - REMOVE_ALL_SCHEDULED_TASK_QUERY_POSTGRES, + let affected_rows = >::remove_all_task( + REMOVE_ALL_TASK_QUERY_POSTGRES, pool, ) .await?; Ok(Res::Bigint(affected_rows)) } - Q::RemoveTask => { + Q::RemoveAllScheduledTask => { let affected_rows = - general_any_impl_remove_task(REMOVE_TASK_QUERY_POSTGRES, pool, params).await?; + >::remove_all_scheduled_tasks( + REMOVE_ALL_SCHEDULED_TASK_QUERY_POSTGRES, + pool, + ) + .await?; Ok(Res::Bigint(affected_rows)) } - Q::RemoveTaskByMetadata => { - let affected_rows = general_any_impl_remove_task_by_metadata( - REMOVE_TASK_BY_METADATA_QUERY_POSTGRES, + Q::RemoveTask => { + let affected_rows = >::remove_task( + REMOVE_TASK_QUERY_POSTGRES, pool, params, ) @@ -102,8 +169,19 @@ impl BackendSqlXPg { Ok(Res::Bigint(affected_rows)) } + Q::RemoveTaskByMetadata => { + let affected_rows = + >::remove_task_by_metadata( + REMOVE_TASK_BY_METADATA_QUERY_POSTGRES, + pool, + params, + ) + .await?; + + Ok(Res::Bigint(affected_rows)) + } Q::RemoveTaskType => { - let affected_rows = general_any_impl_remove_task_type( + let affected_rows = >::remove_task_type( REMOVE_TASKS_TYPE_QUERY_POSTGRES, pool, params, @@ -113,25 +191,35 @@ impl BackendSqlXPg { Ok(Res::Bigint(affected_rows)) } Q::FetchTaskType => { - let task = - general_any_impl_fetch_task_type(FETCH_TASK_TYPE_QUERY_POSTGRES, pool, params) - .await?; + let task = >::fetch_task_type( + FETCH_TASK_TYPE_QUERY_POSTGRES, + pool, + params, + ) + .await?; Ok(Res::Task(task)) } Q::FindTaskById => { - let task = - general_any_impl_find_task_by_id(FIND_TASK_BY_ID_QUERY_POSTGRES, pool, params) - .await?; + let task = >::find_task_by_id( + FIND_TASK_BY_ID_QUERY_POSTGRES, + pool, + params, + ) + .await?; Ok(Res::Task(task)) } Q::RetryTask => { - let task = - general_any_impl_retry_task(RETRY_TASK_QUERY_POSTGRES, pool, params).await?; + let task = >::retry_task( + RETRY_TASK_QUERY_POSTGRES, + pool, + params, + ) + .await?; Ok(Res::Task(task)) } Q::InsertTaskIfNotExists => { - let task = general_any_impl_insert_task_if_not_exists( + let task = >::insert_task_if_not_exists( ( FIND_TASK_BY_UNIQ_HASH_QUERY_POSTGRES, INSERT_TASK_UNIQ_QUERY_POSTGRES, diff --git a/fang/src/asynk/backend_sqlx/sqlite.rs b/fang/src/asynk/backend_sqlx/sqlite.rs index af37a34..430c960 100644 --- a/fang/src/asynk/backend_sqlx/sqlite.rs +++ b/fang/src/asynk/backend_sqlx/sqlite.rs @@ -20,39 +20,103 @@ const RETRY_TASK_QUERY_SQLITE: &str = include_str!("../queries_sqlite/retry_task #[derive(Debug, Clone)] pub(super) struct BackendSqlXSQLite {} +use super::FangQueryable; use super::{QueryParams, Res, SqlXQuery}; use crate::AsyncQueueError; -use sqlx::{Any, Pool}; +use crate::FangTaskState; +use crate::Task; +use chrono::{DateTime, Utc}; +use sqlx::sqlite::SqliteRow; +use sqlx::FromRow; +use sqlx::Pool; +use sqlx::Row; +use sqlx::Sqlite; +use uuid::Uuid; use SqlXQuery as Q; -use super::general_any_impl_fail_task; -use super::general_any_impl_fetch_task_type; -use super::general_any_impl_find_task_by_id; -use super::general_any_impl_insert_task; -use super::general_any_impl_insert_task_if_not_exists; -use super::general_any_impl_remove_all_scheduled_tasks; -use super::general_any_impl_remove_all_task; -use super::general_any_impl_remove_task; -use super::general_any_impl_remove_task_by_metadata; -use super::general_any_impl_remove_task_type; -use super::general_any_impl_retry_task; -use super::general_any_impl_update_task_state; +impl<'a> FromRow<'a, SqliteRow> for Task { + fn from_row(row: &'a SqliteRow) -> Result { + let uuid_as_text: &str = row.get("id"); + + let id = Uuid::parse_str(uuid_as_text).unwrap(); + + let raw: &str = row.get("metadata"); // will work if database cast json to string + let raw = raw.replace('\\', ""); + + // -- SELECT metadata->>'type' FROM fang_tasks ; this works because jsonb casting + let metadata: serde_json::Value = serde_json::from_str(&raw).unwrap(); + + // Be careful with this if we update sqlx, https://github.com/launchbadge/sqlx/issues/2416 + let error_message: Option = row.get("error_message"); + + let state_str: &str = row.get("state"); // will work if database cast json to string + + let state: FangTaskState = state_str.into(); + + let task_type: String = row.get("task_type"); + + // Be careful with this if we update sqlx, https://github.com/launchbadge/sqlx/issues/2416 + let uniq_hash: Option = row.get("uniq_hash"); + + let retries: i32 = row.get("retries"); + + let scheduled_at_str: &str = row.get("scheduled_at"); + + // This unwrap is safe because we know that the database returns the date in the correct format + let scheduled_at: DateTime = DateTime::parse_from_str(scheduled_at_str, "%F %T%.f%#z") + .unwrap() + .into(); + + let created_at_str: &str = row.get("created_at"); + + // This unwrap is safe because we know that the database returns the date in the correct format + let created_at: DateTime = DateTime::parse_from_str(created_at_str, "%F %T%.f%#z") + .unwrap() + .into(); + + let updated_at_str: &str = row.get("updated_at"); + + // This unwrap is safe because we know that the database returns the date in the correct format + let updated_at: DateTime = DateTime::parse_from_str(updated_at_str, "%F %T%.f%#z") + .unwrap() + .into(); + + Ok(Task::builder() + .id(id) + .metadata(metadata) + .error_message(error_message) + .state(state) + .task_type(task_type) + .uniq_hash(uniq_hash) + .retries(retries) + .scheduled_at(scheduled_at) + .created_at(created_at) + .updated_at(updated_at) + .build()) + } +} + +impl FangQueryable for BackendSqlXSQLite {} impl BackendSqlXSQLite { pub(super) async fn execute_query( query: SqlXQuery, - pool: &Pool, + pool: &Pool, params: QueryParams<'_>, ) -> Result { match query { Q::InsertTask => { - let task = - general_any_impl_insert_task(INSERT_TASK_QUERY_SQLITE, pool, params).await?; + let task = >::insert_task( + INSERT_TASK_QUERY_SQLITE, + pool, + params, + ) + .await?; Ok(Res::Task(task)) } Q::UpdateTaskState => { - let task = general_any_impl_update_task_state( + let task = >::update_task_state( UPDATE_TASK_STATE_QUERY_SQLITE, pool, params, @@ -61,34 +125,37 @@ impl BackendSqlXSQLite { Ok(Res::Task(task)) } Q::FailTask => { - let task = general_any_impl_fail_task(FAIL_TASK_QUERY_SQLITE, pool, params).await?; + let task = >::fail_task( + FAIL_TASK_QUERY_SQLITE, + pool, + params, + ) + .await?; Ok(Res::Task(task)) } Q::RemoveAllTask => { - let affected_rows = - general_any_impl_remove_all_task(REMOVE_ALL_TASK_QUERY_SQLITE, pool).await?; - - Ok(Res::Bigint(affected_rows)) - } - Q::RemoveAllScheduledTask => { - let affected_rows = general_any_impl_remove_all_scheduled_tasks( - REMOVE_ALL_SCHEDULED_TASK_QUERY_SQLITE, + let affected_rows = >::remove_all_task( + REMOVE_ALL_TASK_QUERY_SQLITE, pool, ) .await?; Ok(Res::Bigint(affected_rows)) } - Q::RemoveTask => { + Q::RemoveAllScheduledTask => { let affected_rows = - general_any_impl_remove_task(REMOVE_TASK_QUERY_SQLITE, pool, params).await?; + >::remove_all_scheduled_tasks( + REMOVE_ALL_SCHEDULED_TASK_QUERY_SQLITE, + pool, + ) + .await?; Ok(Res::Bigint(affected_rows)) } - Q::RemoveTaskByMetadata => { - let affected_rows = general_any_impl_remove_task_by_metadata( - REMOVE_TASK_BY_METADATA_QUERY_SQLITE, + Q::RemoveTask => { + let affected_rows = >::remove_task( + REMOVE_TASK_QUERY_SQLITE, pool, params, ) @@ -96,33 +163,57 @@ impl BackendSqlXSQLite { Ok(Res::Bigint(affected_rows)) } - Q::RemoveTaskType => { + Q::RemoveTaskByMetadata => { let affected_rows = - general_any_impl_remove_task_type(REMOVE_TASKS_TYPE_QUERY_SQLITE, pool, params) - .await?; + >::remove_task_by_metadata( + REMOVE_TASK_BY_METADATA_QUERY_SQLITE, + pool, + params, + ) + .await?; + + Ok(Res::Bigint(affected_rows)) + } + Q::RemoveTaskType => { + let affected_rows = >::remove_task_type( + REMOVE_TASKS_TYPE_QUERY_SQLITE, + pool, + params, + ) + .await?; Ok(Res::Bigint(affected_rows)) } Q::FetchTaskType => { - let task = - general_any_impl_fetch_task_type(FETCH_TASK_TYPE_QUERY_SQLITE, pool, params) - .await?; + let task = >::fetch_task_type( + FETCH_TASK_TYPE_QUERY_SQLITE, + pool, + params, + ) + .await?; Ok(Res::Task(task)) } Q::FindTaskById => { - let task = - general_any_impl_find_task_by_id(FIND_TASK_BY_ID_QUERY_SQLITE, pool, params) - .await?; + let task = >::find_task_by_id( + FIND_TASK_BY_ID_QUERY_SQLITE, + pool, + params, + ) + .await?; Ok(Res::Task(task)) } Q::RetryTask => { - let task = - general_any_impl_retry_task(RETRY_TASK_QUERY_SQLITE, pool, params).await?; + let task = >::retry_task( + RETRY_TASK_QUERY_SQLITE, + pool, + params, + ) + .await?; Ok(Res::Task(task)) } Q::InsertTaskIfNotExists => { - let task = general_any_impl_insert_task_if_not_exists( + let task = >::insert_task_if_not_exists( ( FIND_TASK_BY_UNIQ_HASH_QUERY_SQLITE, INSERT_TASK_UNIQ_QUERY_SQLITE,