Skip to content

Commit

Permalink
Fix/independent decoding (#149)
Browse files Browse the repository at this point in the history
* independent decoding

* Postgres and Sqlite passing

* fix unreachable pattern warn

* delete vscode stuff

* I think this may be a Rust compiler issue, but I fixed it so I will take it

* fix clippy
  • Loading branch information
pxp9 authored Apr 15, 2024
1 parent 0e59175 commit 7ec0e79
Show file tree
Hide file tree
Showing 6 changed files with 1,112 additions and 654 deletions.
167 changes: 133 additions & 34 deletions fang/src/asynk/async_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,17 +13,35 @@ use async_trait::async_trait;
use chrono::DateTime;
use chrono::Utc;
use cron::Schedule;
//use sqlx::any::install_default_drivers; // this is supported in sqlx 0.7
use sqlx::any::AnyConnectOptions;
use sqlx::any::AnyKind;
#[cfg(any(
feature = "asynk-postgres",
feature = "asynk-mysql",
feature = "asynk-sqlite"
))]
use sqlx::pool::PoolOptions;
use sqlx::Any;
use sqlx::AnyPool;
use sqlx::Pool;
//use sqlx::any::install_default_drivers; // this is supported in sqlx 0.7
use std::str::FromStr;
use thiserror::Error;
use typed_builder::TypedBuilder;
use uuid::Uuid;

#[cfg(feature = "asynk-postgres")]
use sqlx::PgPool;
#[cfg(feature = "asynk-postgres")]
use sqlx::Postgres;

#[cfg(feature = "asynk-mysql")]
use sqlx::MySql;
#[cfg(feature = "asynk-mysql")]
use sqlx::MySqlPool;

#[cfg(feature = "asynk-sqlite")]
use sqlx::Sqlite;
#[cfg(feature = "asynk-sqlite")]
use sqlx::SqlitePool;

#[cfg(test)]
use self::async_queue_tests::test_asynk_queue;

Expand Down Expand Up @@ -134,11 +152,51 @@ pub trait AsyncQueueable: Send {
/// .build();
/// ```
///
///
#[derive(Debug, Clone)]
pub(crate) enum InternalPool {
#[cfg(feature = "asynk-postgres")]
Pg(PgPool),
#[cfg(feature = "asynk-mysql")]
MySql(MySqlPool),
#[cfg(feature = "asynk-sqlite")]
Sqlite(SqlitePool),
}

impl InternalPool {
#[cfg(feature = "asynk-postgres")]
pub(crate) fn unwrap_pg_pool(&self) -> &PgPool {
match self {
InternalPool::Pg(pool) => pool,
#[allow(unreachable_patterns)]
_ => panic!("Not a PgPool!"),
}
}

#[cfg(feature = "asynk-mysql")]
pub(crate) fn unwrap_mysql_pool(&self) -> &MySqlPool {
match self {
InternalPool::MySql(pool) => pool,
#[allow(unreachable_patterns)]
_ => panic!("Not a MySqlPool!"),
}
}

#[cfg(feature = "asynk-sqlite")]
pub(crate) fn unwrap_sqlite_pool(&self) -> &SqlitePool {
match self {
InternalPool::Sqlite(pool) => pool,
#[allow(unreachable_patterns)]
_ => panic!("Not a SqlitePool!"),
}
}
}

#[derive(TypedBuilder, Debug, Clone)]
pub struct AsyncQueue {
#[builder(default=None, setter(skip))]
pool: Option<AnyPool>,
pool: Option<InternalPool>,
#[builder(setter(into))]
uri: String,
#[builder(setter(into))]
Expand All @@ -152,36 +210,61 @@ pub struct AsyncQueue {
#[cfg(test)]
use tokio::sync::Mutex;

#[cfg(test)]
#[cfg(all(test, feature = "asynk-postgres"))]
static ASYNC_QUEUE_POSTGRES_TEST_COUNTER: Mutex<u32> = Mutex::const_new(0);

#[cfg(test)]
#[cfg(all(test, feature = "asynk-sqlite"))]
static ASYNC_QUEUE_SQLITE_TEST_COUNTER: Mutex<u32> = Mutex::const_new(0);

#[cfg(test)]
#[cfg(all(test, feature = "asynk-mysql"))]
static ASYNC_QUEUE_MYSQL_TEST_COUNTER: Mutex<u32> = Mutex::const_new(0);

#[cfg(test)]
use sqlx::Executor;

#[cfg(test)]
#[cfg(all(test, feature = "asynk-sqlite"))]
use std::path::Path;

#[cfg(test)]
use std::env;

use super::backend_sqlx::BackendSqlX;

fn get_backend(_anykind: AnyKind) -> BackendSqlX {
match _anykind {
async fn get_backend(
kind: AnyKind,
_uri: &str,
_max_connections: u32,
) -> Result<(BackendSqlX, InternalPool), AsyncQueueError> {
match kind {
#[cfg(feature = "asynk-postgres")]
AnyKind::Postgres => BackendSqlX::Pg,
AnyKind::Postgres => {
let pool = PoolOptions::<Postgres>::new()
.max_connections(_max_connections)
.connect(_uri)
.await?;

Ok((BackendSqlX::Pg, InternalPool::Pg(pool)))
}
#[cfg(feature = "asynk-mysql")]
AnyKind::MySql => BackendSqlX::MySql,
AnyKind::MySql => {
let pool = PoolOptions::<MySql>::new()
.max_connections(_max_connections)
.connect(_uri)
.await?;

Ok((BackendSqlX::MySql, InternalPool::MySql(pool)))
}
#[cfg(feature = "asynk-sqlite")]
AnyKind::Sqlite => BackendSqlX::Sqlite,
AnyKind::Sqlite => {
let pool = PoolOptions::<Sqlite>::new()
.max_connections(_max_connections)
.connect(_uri)
.await?;

Ok((BackendSqlX::Sqlite, InternalPool::Sqlite(pool)))
}
#[allow(unreachable_patterns)]
_ => unreachable!(),
_ => panic!("Not a valid backend"),
}
}

Expand All @@ -199,22 +282,18 @@ impl AsyncQueue {
pub async fn connect(&mut self) -> Result<(), AsyncQueueError> {
//install_default_drivers();

let pool: AnyPool = PoolOptions::new()
.max_connections(self.max_pool_size)
.connect(&self.uri)
.await?;
let kind: AnyKind = self.uri.parse::<AnyConnectOptions>()?.kind();

let anykind = pool.any_kind();

self.backend = get_backend(anykind);
let (backend, pool) = get_backend(kind, &self.uri, self.max_pool_size).await?;

self.pool = Some(pool);
self.backend = backend;
self.connected = true;
Ok(())
}

async fn fetch_and_touch_task_query(
pool: &Pool<Any>,
pool: &InternalPool,
backend: &BackendSqlX,
task_type: Option<String>,
) -> Result<Option<Task>, AsyncQueueError> {
Expand Down Expand Up @@ -250,7 +329,7 @@ impl AsyncQueue {
}

async fn insert_task_query(
pool: &Pool<Any>,
pool: &InternalPool,
backend: &BackendSqlX,
metadata: &serde_json::Value,
task_type: &str,
Expand All @@ -271,7 +350,7 @@ impl AsyncQueue {
}

async fn insert_task_if_not_exist_query(
pool: &Pool<Any>,
pool: &InternalPool,
backend: &BackendSqlX,
metadata: &serde_json::Value,
task_type: &str,
Expand All @@ -292,7 +371,7 @@ impl AsyncQueue {
}

async fn schedule_task_query(
pool: &Pool<Any>,
pool: &InternalPool,
backend: &BackendSqlX,
task: &dyn AsyncRunnable,
) -> Result<Task, AsyncQueueError> {
Expand Down Expand Up @@ -553,7 +632,7 @@ impl AsyncQueueable for AsyncQueue {
}
}

#[cfg(test)]
#[cfg(all(test, feature = "asynk-postgres"))]
impl AsyncQueue {
/// Provides an AsyncQueue connected to its own DB
pub async fn test_postgres() -> Self {
Expand All @@ -575,7 +654,14 @@ impl AsyncQueue {
let create_query: &str = &format!("CREATE DATABASE {} WITH TEMPLATE fang;", db_name);
let delete_query: &str = &format!("DROP DATABASE IF EXISTS {};", db_name);

let mut conn = res.pool.as_mut().unwrap().acquire().await.unwrap();
let mut conn = res
.pool
.as_mut()
.unwrap()
.unwrap_pg_pool()
.acquire()
.await
.unwrap();

log::info!("Deleting database {db_name} ...");
conn.execute(delete_query).await.unwrap();
Expand All @@ -600,7 +686,10 @@ impl AsyncQueue {

res
}
}

#[cfg(all(test, feature = "asynk-sqlite"))]
impl AsyncQueue {
/// Provides an AsyncQueue connected to its own DB
pub async fn test_sqlite() -> Self {
dotenvy::dotenv().expect(".env file not found");
Expand Down Expand Up @@ -632,7 +721,10 @@ impl AsyncQueue {
res.connect().await.expect("fail to connect");
res
}
}

#[cfg(all(test, feature = "asynk-mysql"))]
impl AsyncQueue {
/// Provides an AsyncQueue connected to its own DB
pub async fn test_mysql() -> Self {
dotenvy::dotenv().expect(".env file not found");
Expand All @@ -657,7 +749,14 @@ impl AsyncQueue {

let delete_query: &str = &format!("DROP DATABASE IF EXISTS {};", db_name);

let mut conn = res.pool.as_mut().unwrap().acquire().await.unwrap();
let mut conn = res
.pool
.as_mut()
.unwrap()
.unwrap_mysql_pool()
.acquire()
.await
.unwrap();

log::info!("Deleting database {db_name} ...");
conn.execute(delete_query).await.unwrap();
Expand All @@ -684,11 +783,11 @@ impl AsyncQueue {
}
}

#[cfg(test)]
test_asynk_queue! {postgres, crate::AsyncQueue, crate::AsyncQueue::test_postgres()}
#[cfg(all(test, feature = "asynk-postgres"))]
test_asynk_queue! {postgres, crate::AsyncQueue,crate::AsyncQueue::test_postgres()}

#[cfg(test)]
test_asynk_queue! {sqlite, crate::AsyncQueue, crate::AsyncQueue::test_sqlite()}
#[cfg(all(test, feature = "asynk-sqlite"))]
test_asynk_queue! {sqlite, crate::AsyncQueue,crate::AsyncQueue::test_sqlite()}

#[cfg(test)]
#[cfg(all(test, feature = "asynk-mysql"))]
test_asynk_queue! {mysql, crate::AsyncQueue, crate::AsyncQueue::test_mysql()}
2 changes: 1 addition & 1 deletion fang/src/asynk/async_queue/async_queue_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ macro_rules! test_asynk_queue {
}

#[tokio::test]
async fn failed_task_query_test() {
async fn failed_task_test() {
let mut test: $q = $e.await;

let task = test.insert_task(&AsyncTask { number: 1 }).await.unwrap();
Expand Down
Loading

0 comments on commit 7ec0e79

Please sign in to comment.