diff --git a/.sqlx/query-1012a599fd63f84527c0f197eb819aaedc6164d9395e886cf1862dbd8a108d42.json b/.sqlx/query-0ec9bab7790dfd2c3014010714bea4cd6c36fb4a5bd9e2c9e3bda23e981f8340.json similarity index 57% rename from .sqlx/query-1012a599fd63f84527c0f197eb819aaedc6164d9395e886cf1862dbd8a108d42.json rename to .sqlx/query-0ec9bab7790dfd2c3014010714bea4cd6c36fb4a5bd9e2c9e3bda23e981f8340.json index 64b3800..d111a98 100644 --- a/.sqlx/query-1012a599fd63f84527c0f197eb819aaedc6164d9395e886cf1862dbd8a108d42.json +++ b/.sqlx/query-0ec9bab7790dfd2c3014010714bea4cd6c36fb4a5bd9e2c9e3bda23e981f8340.json @@ -1,6 +1,6 @@ { "db_name": "PostgreSQL", - "query": "INSERT INTO pluggy_items (user_id, external_item_id, created_at, last_updated_at)\n VALUES ($1, $2, $3, $4)", + "query": "INSERT INTO pluggy_items (user_id, external_item_id, created_at, last_updated_at, connector_name)\n VALUES ($1, $2, $3, $4, $5)", "describe": { "columns": [], "parameters": { @@ -8,10 +8,11 @@ "Int4", "Uuid", "Timestamptz", - "Timestamptz" + "Timestamptz", + "Text" ] }, "nullable": [] }, - "hash": "1012a599fd63f84527c0f197eb819aaedc6164d9395e886cf1862dbd8a108d42" + "hash": "0ec9bab7790dfd2c3014010714bea4cd6c36fb4a5bd9e2c9e3bda23e981f8340" } diff --git a/.sqlx/query-902b838c5d8d493b6f9baaed17dd4cfc72e567fe57d0b05245f71c1605744526.json b/.sqlx/query-902b838c5d8d493b6f9baaed17dd4cfc72e567fe57d0b05245f71c1605744526.json deleted file mode 100644 index b4a6d8c..0000000 --- a/.sqlx/query-902b838c5d8d493b6f9baaed17dd4cfc72e567fe57d0b05245f71c1605744526.json +++ /dev/null @@ -1,22 +0,0 @@ -{ - "db_name": "PostgreSQL", - "query": "select external_item_id from pluggy_items\n where user_id = $1\n order by created_at desc\n ", - "describe": { - "columns": [ - { - "ordinal": 0, - "name": "external_item_id", - "type_info": "Uuid" - } - ], - "parameters": { - "Left": [ - "Int4" - ] - }, - "nullable": [ - false - ] - }, - "hash": "902b838c5d8d493b6f9baaed17dd4cfc72e567fe57d0b05245f71c1605744526" -} diff --git a/.sqlx/query-90a0782c5c17d4be5922646b28474082bf7f2a48d1e7bcce8ea8213594772b5d.json b/.sqlx/query-90a0782c5c17d4be5922646b28474082bf7f2a48d1e7bcce8ea8213594772b5d.json new file mode 100644 index 0000000..d24d6c0 --- /dev/null +++ b/.sqlx/query-90a0782c5c17d4be5922646b28474082bf7f2a48d1e7bcce8ea8213594772b5d.json @@ -0,0 +1,16 @@ +{ + "db_name": "PostgreSQL", + "query": "\n UPDATE pluggy_items SET last_updated_at = $1\n WHERE id = $2 AND user_id = $3\n ", + "describe": { + "columns": [], + "parameters": { + "Left": [ + "Timestamptz", + "Int4", + "Int4" + ] + }, + "nullable": [] + }, + "hash": "90a0782c5c17d4be5922646b28474082bf7f2a48d1e7bcce8ea8213594772b5d" +} diff --git a/.sqlx/query-e241cced3064c9233118b0c2ea4b99dd6aaa7f971bf08499f06ea574ee321ae2.json b/.sqlx/query-e241cced3064c9233118b0c2ea4b99dd6aaa7f971bf08499f06ea574ee321ae2.json new file mode 100644 index 0000000..478d120 --- /dev/null +++ b/.sqlx/query-e241cced3064c9233118b0c2ea4b99dd6aaa7f971bf08499f06ea574ee321ae2.json @@ -0,0 +1,28 @@ +{ + "db_name": "PostgreSQL", + "query": "\n SELECT DISTINCT ON (connector_name)\n external_item_id,\n connector_name\n FROM\n pluggy_items\n WHERE\n user_id = $1\n ORDER BY\n connector_name,\n created_at DESC;\n ", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "external_item_id", + "type_info": "Uuid" + }, + { + "ordinal": 1, + "name": "connector_name", + "type_info": "Text" + } + ], + "parameters": { + "Left": [ + "Int4" + ] + }, + "nullable": [ + false, + false + ] + }, + "hash": "e241cced3064c9233118b0c2ea4b99dd6aaa7f971bf08499f06ea574ee321ae2" +} diff --git a/.sqlx/query-fd0e2c835f6e1fcef1026c3c69a82aed1bef8da90b2469b1f5cdd6fdff56470a.json b/.sqlx/query-fd0e2c835f6e1fcef1026c3c69a82aed1bef8da90b2469b1f5cdd6fdff56470a.json new file mode 100644 index 0000000..5717d3b --- /dev/null +++ b/.sqlx/query-fd0e2c835f6e1fcef1026c3c69a82aed1bef8da90b2469b1f5cdd6fdff56470a.json @@ -0,0 +1,23 @@ +{ + "db_name": "PostgreSQL", + "query": "\n SELECT id\n FROM pluggy_items\n WHERE user_id = $1\n AND external_item_id = $2\n ", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "id", + "type_info": "Int4" + } + ], + "parameters": { + "Left": [ + "Int4", + "Uuid" + ] + }, + "nullable": [ + false + ] + }, + "hash": "fd0e2c835f6e1fcef1026c3c69a82aed1bef8da90b2469b1f5cdd6fdff56470a" +} diff --git a/migrations/20241216232151_origin-bank.down.sql b/migrations/20241216232151_origin-bank.down.sql index 249f1e5..d906c98 100644 --- a/migrations/20241216232151_origin-bank.down.sql +++ b/migrations/20241216232151_origin-bank.down.sql @@ -1,6 +1,7 @@ BEGIN; --- ALTER TABLE pluggy_items --- DROP COLUMN last_updated_at; +ALTER TABLE pluggy_items + DROP COLUMN connector_name, + DROP COLUMN last_updated_at; ALTER TABLE expenses ALTER COLUMN category SET NOT NULL; ALTER TABLE expenses diff --git a/migrations/20241216232151_origin-bank.up.sql b/migrations/20241216232151_origin-bank.up.sql index 7f8409a..8811b4d 100644 --- a/migrations/20241216232151_origin-bank.up.sql +++ b/migrations/20241216232151_origin-bank.up.sql @@ -7,5 +7,6 @@ ALTER TABLE expenses ALTER TABLE expenses ALTER COLUMN category DROP NOT NULL; ALTER TABLE pluggy_items - ADD COLUMN last_updated_at timestamptz NOT NULL; + ADD COLUMN last_updated_at timestamptz NOT NULL, + ADD COLUMN connector_name text NOT NULL; COMMIT; diff --git a/src/client/pluggy/transactions.rs b/src/client/pluggy/transactions.rs index ee1851e..26f5cfb 100644 --- a/src/client/pluggy/transactions.rs +++ b/src/client/pluggy/transactions.rs @@ -31,7 +31,7 @@ pub struct Transaction { pub date: OffsetDateTime, /// Category of the transaction (e.g. Restaurants, Education). /// See the Transaction Categorization section in our guides. - pub category: String, + pub category: Option, /// Id of the transaction category. /// Can be used to identify the category in the Categories endpoint category_id: Option, diff --git a/src/data/openfinance.rs b/src/data/openfinance.rs index a50a8a4..cdbbd48 100644 --- a/src/data/openfinance.rs +++ b/src/data/openfinance.rs @@ -23,6 +23,11 @@ pub fn router() -> Router> { .route("/api/refresh-transactions", get(refresh_transactions)) } +#[derive(Deserialize)] +struct ConnectTokenRequest { + maybe_item_id: Option, +} + #[derive(Serialize)] struct ConnectTokenResponse { connect_token: String, @@ -31,6 +36,7 @@ struct ConnectTokenResponse { async fn connect_token( auth_session: AuthSession, State(shared_state): State>, + Query(pluggyconnect_request): Query, ) -> Result, StatusCode> { use crate::client::pluggy::auth::CreateConnectTokenOutcome; @@ -38,21 +44,11 @@ async fn connect_token( panic!("user not logged in") }; - let maybe_item_id = - match crate::queries::pluggy_items::find_latest_for_user(&shared_state.pool, user.id).await - { - Ok(res) => res.map(|i| i.external_item_id), - Err(e) => { - tracing::error!(user.id, ?e, "error finding latest item_id for user"); - return Err(StatusCode::INTERNAL_SERVER_ERROR); - } - }; - match crate::client::pluggy::auth::create_connect_token( &shared_state.pluggy_api_key.lock().await, "https://webhook.fina.center".to_owned(), user.id, - maybe_item_id, + pluggyconnect_request.maybe_item_id, ) .await { @@ -77,6 +73,7 @@ async fn connect_token( #[derive(Deserialize)] struct ConnectRequest { item_id: Uuid, + connector_name: String, } async fn connect_success( @@ -88,27 +85,46 @@ async fn connect_success( panic!("user not logged in") }; - let maybe_item_id = - match crate::queries::pluggy_items::find_latest_for_user(&shared_state.pool, user.id).await + let maybe_item_id = match crate::queries::pluggy_items::find_item( + &shared_state.pool, + user.id, + pluggyconnect_request.item_id, + ) + .await + { + Ok(res) => res.map(|i| i.id), + Err(e) => { + tracing::error!(user.id, ?e, "error finding item_id for user"); + return StatusCode::INTERNAL_SERVER_ERROR; + } + }; + + if let Some(item_id) = maybe_item_id { + match crate::queries::pluggy_items::updated( + &shared_state.pool, + user.id, + item_id, + OffsetDateTime::now_utc(), + ) + .await { - Ok(res) => res.map(|i| i.external_item_id), + Ok(c) => { + if c.rows_affected() > 1 { + tracing::error!("i really need a macro that cancels the transaction"); + } + StatusCode::OK + } Err(e) => { - tracing::error!(user.id, ?e, "error finding latest item_id for user"); - return StatusCode::INTERNAL_SERVER_ERROR; + tracing::error!(?e, user.id, "error updating pluggy connect item"); + StatusCode::INTERNAL_SERVER_ERROR } - }; - if let Some(item_id) = maybe_item_id { - if item_id == pluggyconnect_request.item_id { - StatusCode::OK - } else { - tracing::error!(user.id, "updated not latest item_id?"); - StatusCode::PRECONDITION_FAILED } } else { match crate::features::openfinance::add_item( shared_state.pool.clone(), user.id, pluggyconnect_request.item_id, + pluggyconnect_request.connector_name, OffsetDateTime::now_utc(), ) .await diff --git a/src/features/expenses.rs b/src/features/expenses.rs index f906707..3d8ba51 100644 --- a/src/features/expenses.rs +++ b/src/features/expenses.rs @@ -35,73 +35,77 @@ pub async fn process_pluggy_expenses( db_pool: PgPool, pluggy_api_key: &str, ) -> anyhow::Result<()> { - let Some(item_id) = - crate::queries::pluggy_items::find_latest_for_user(&db_pool, user_id).await? - else { - tracing::error!("aaaaaaa should create pluggy item"); - todo!("create pluggy item") - }; - - let accounts = - crate::client::pluggy::account::list_accounts(pluggy_api_key, &item_id.external_item_id) - .await - .context("failed to list accounts from pluggy api")? - .results; - - for account in accounts { - let most_recent = - crate::queries::expenses::most_recent_for_account(&db_pool, user_id, account.id) - .await?; - - let crate::client::pluggy::transactions::ListTransactionsOutcome::Success(res) = - crate::client::pluggy::transactions::list_transactions( - pluggy_api_key, - &account.id, - most_recent - .as_ref() - .map(|t| t.date - time::Duration::days(1)) - .as_ref(), - ) - .await - .context("failed to list accounts from pluggy api")? - else { - tracing::error!("don't know yet"); - panic!("don't know yet") + let item_ids = + match crate::queries::pluggy_items::find_latests_for_user(&db_pool, user_id).await { + Ok(ids) => ids, + Err(e) => { + tracing::error!(?e, "database error on find latest user item_id"); + Vec::new() + } }; - let transactions = res.results; - tracing::info!("current transactions: {transactions:#?}"); - - for transaction in transactions { - if let Some(recent_timestamps) = most_recent.as_ref() { - if transaction.created_at <= recent_timestamps.external_created_at { - tracing::debug!( - ?user_id, - ?transaction.id, - ?transaction.created_at, - ?recent_timestamps.external_created_at, - "skipped inserting already inserted transaction" - ); - continue; + for item in item_ids { + let accounts = + crate::client::pluggy::account::list_accounts(pluggy_api_key, &item.external_item_id) + .await + .context("failed to list accounts from pluggy api")? + .results; + + for account in accounts { + let most_recent = + crate::queries::expenses::most_recent_for_account(&db_pool, user_id, account.id) + .await?; + + let crate::client::pluggy::transactions::ListTransactionsOutcome::Success(res) = + crate::client::pluggy::transactions::list_transactions( + pluggy_api_key, + &account.id, + most_recent + .as_ref() + .map(|t| t.date - time::Duration::days(1)) + .as_ref(), + ) + .await + .context("failed to list transactions from pluggy api")? + else { + tracing::error!("don't know yet"); + panic!("don't know yet") + }; + + let transactions = res.results; + tracing::info!("current transactions: {transactions:#?}"); + + for transaction in transactions { + if let Some(recent_timestamps) = most_recent.as_ref() { + if transaction.created_at <= recent_timestamps.external_created_at { + tracing::debug!( + ?user_id, + ?transaction.id, + ?transaction.created_at, + ?recent_timestamps.external_created_at, + "skipped inserting already inserted transaction" + ); + continue; + } } - } - let params = crate::queries::expenses::CreateParamsFromPluggy { - description: transaction.description, - price: transaction.amount, - category: None, - bank: Some("bank".to_owned()), - external_account_id: transaction.account_id, - external_id: transaction.id, - external_created_at: transaction.created_at, - is_essential: false, - date: transaction.date.date(), - now: OffsetDateTime::now_utc(), - }; - let res = - crate::queries::expenses::insert_from_pluggy(&db_pool, user_id, params).await?; - if res.rows_affected() > 1 { - tracing::error!("i really need a macro that cancels the transaction"); + let params = crate::queries::expenses::CreateParamsFromPluggy { + description: transaction.description, + price: transaction.amount, + category: None, + bank: Some("bank".to_owned()), + external_account_id: transaction.account_id, + external_id: transaction.id, + external_created_at: transaction.created_at, + is_essential: false, + date: transaction.date.date(), + now: OffsetDateTime::now_utc(), + }; + let res = + crate::queries::expenses::insert_from_pluggy(&db_pool, user_id, params).await?; + if res.rows_affected() > 1 { + tracing::error!("i really need a macro that cancels the transaction"); + } } } } diff --git a/src/features/openfinance.rs b/src/features/openfinance.rs index ada3e17..4ebe85f 100644 --- a/src/features/openfinance.rs +++ b/src/features/openfinance.rs @@ -6,9 +6,10 @@ pub async fn add_item( db_pool: PgPool, user_id: i32, item_id: Uuid, + connector_name: String, now: OffsetDateTime, ) -> Result<(), sqlx::Error> { - crate::queries::pluggy_items::create(&db_pool, user_id, item_id, now) + crate::queries::pluggy_items::create(&db_pool, user_id, item_id, now, connector_name) .await .map(|c| { if c.rows_affected() > 1 { diff --git a/src/hypermedia/router/expenses.rs b/src/hypermedia/router/expenses.rs index 4b5b478..a8cf55f 100644 --- a/src/hypermedia/router/expenses.rs +++ b/src/hypermedia/router/expenses.rs @@ -29,13 +29,42 @@ pub fn router() -> Router> { .route("/expenses/:uuid", get(find).put(update).delete(delete)) .route("/expenses/plots", get(plots)) .route("/expenses/pluggy-widget", get(pluggy_widget)) + .route( + "/expenses/update-connection/:uuid", + get(pluggy_widget_update), + ) } -async fn expenses_index(auth_session: AuthSession) -> impl IntoResponse { +async fn expenses_index( + auth_session: AuthSession, + State(shared_state): State>, +) -> impl IntoResponse { match auth_session.user { - Some(user) => ExpensesTemplate { - username: user.username, - ..Default::default() + Some(user) => { + let item_ids = match crate::queries::pluggy_items::find_latests_for_user( + &shared_state.pool, + user.id, + ) + .await + { + Ok(ids) => ids, + Err(e) => { + tracing::error!(?e, "database error on find latest user item_id"); + Vec::new() + } + }; + + ExpensesTemplate { + username: user.username, + connections: item_ids + .into_iter() + .map(|i| crate::templates::PluggyConnections { + id: i.external_item_id, + name: i.connector_name, + }) + .collect(), + ..Default::default() + } } .into_response_with_nonce(), None => StatusCode::UNAUTHORIZED.into_response(), @@ -307,9 +336,23 @@ async fn pluggy_widget( ) -> impl IntoResponse { crate::hypermedia::service::pluggy::widget( auth_session, - shared_state.pool.clone(), &shared_state.env, &shared_state.pluggy_api_key.lock().await, + None, + ) + .await +} + +async fn pluggy_widget_update( + auth_session: AuthSession, + State(shared_state): State>, + Path(uuid): Path, +) -> impl IntoResponse { + crate::hypermedia::service::pluggy::widget( + auth_session, + &shared_state.env, + &shared_state.pluggy_api_key.lock().await, + Some(uuid), ) .await } diff --git a/src/hypermedia/service/pluggy.rs b/src/hypermedia/service/pluggy.rs index ac95cb6..7955c7b 100644 --- a/src/hypermedia/service/pluggy.rs +++ b/src/hypermedia/service/pluggy.rs @@ -10,13 +10,12 @@ use crate::{ use askama_axum::IntoResponse; use axum::http::StatusCode; -use sqlx::PgPool; pub async fn widget( auth_session: AuthSession, - db_pool: PgPool, env: &Env, pluggy_api_key: &str, + maybe_item_id: Option, ) -> impl IntoResponse { let Some(user) = auth_session.user else { return (StatusCode::UNAUTHORIZED, [("HX-Redirect", "/auth/signin")]).into_response(); @@ -36,19 +35,6 @@ pub async fn widget( } }; - let maybe_item_id = - match crate::queries::pluggy_items::find_latest_for_user(&db_pool, user.id).await { - Ok(id) => id.map(|i| i.external_item_id), - Err(e) => { - tracing::error!(?e, "database error on find latest user item_id"); - return ( - StatusCode::INTERNAL_SERVER_ERROR, - PluggyWidgetModalErrorTemplate {}, - ) - .into_response(); - } - }; - let connect_token = match create_connect_token(pluggy_api_key, webhook_url, user.id, maybe_item_id).await { Ok(CreateConnectTokenOutcome::Success(connect_token)) => connect_token, diff --git a/src/queries/pluggy_items.rs b/src/queries/pluggy_items.rs index 0ae86b5..237e3ab 100644 --- a/src/queries/pluggy_items.rs +++ b/src/queries/pluggy_items.rs @@ -7,35 +7,89 @@ pub async fn create( user_id: i32, item_id: Uuid, now: OffsetDateTime, + connector_name: String, ) -> Result { sqlx::query!( - r#"INSERT INTO pluggy_items (user_id, external_item_id, created_at, last_updated_at) - VALUES ($1, $2, $3, $4)"#, + r#"INSERT INTO pluggy_items (user_id, external_item_id, created_at, last_updated_at, connector_name) + VALUES ($1, $2, $3, $4, $5)"#, user_id, item_id, now, - now + now, + connector_name ) .execute(conn) .await } +pub struct ItemFound { + pub id: i32, +} + +pub async fn find_item( + conn: impl PgExecutor<'_>, + user_id: i32, + external_item_id: Uuid, +) -> Result, sqlx::Error> { + sqlx::query_as!( + ItemFound, + r#" + SELECT id + FROM pluggy_items + WHERE user_id = $1 + AND external_item_id = $2 + "#, + user_id, + external_item_id, + ) + .fetch_optional(conn) + .await +} + pub struct UserPluggyItem { pub external_item_id: Uuid, + pub connector_name: String, } -pub async fn find_latest_for_user( +pub async fn find_latests_for_user( conn: impl PgExecutor<'_>, user_id: i32, -) -> Result, sqlx::Error> { +) -> Result, sqlx::Error> { sqlx::query_as!( UserPluggyItem, - r#"select external_item_id from pluggy_items - where user_id = $1 - order by created_at desc + r#" + SELECT DISTINCT ON (connector_name) + external_item_id, + connector_name + FROM + pluggy_items + WHERE + user_id = $1 + ORDER BY + connector_name, + created_at DESC; "#, user_id, ) - .fetch_optional(conn) + .fetch_all(conn) + .await +} + +pub async fn updated( + conn: impl PgExecutor<'_>, + user_id: i32, + item_id: i32, + now: OffsetDateTime, +) -> Result { + sqlx::query!( + r#" + UPDATE pluggy_items SET last_updated_at = $1 + WHERE id = $2 AND user_id = $3 + "#, + now, + user_id, + item_id, + ) + .execute(conn) .await } diff --git a/src/templates.rs b/src/templates.rs index 808c641..d8662bd 100644 --- a/src/templates.rs +++ b/src/templates.rs @@ -9,6 +9,11 @@ use strum::IntoEnumIterator; use time::{Date, OffsetDateTime}; use uuid::Uuid; +pub struct PluggyConnections { + pub id: Uuid, + pub name: String, +} + #[derive(Template)] #[template(path = "expenses.html")] /// The askama template for the expenses page. @@ -19,6 +24,7 @@ pub struct ExpensesTemplate { pub username: String, /// CSP nonce pub nonce: String, + pub connections: Vec, } impl Default for ExpensesTemplate { @@ -27,6 +33,7 @@ impl Default for ExpensesTemplate { expense_categories: ExpenseCategory::iter(), username: String::new(), nonce: String::new(), + connections: Vec::new(), }; } } @@ -42,6 +49,7 @@ impl ExpensesTemplate { expense_categories: self.expense_categories, username: self.username, nonce, + connections: self.connections, } .into_response(); diff --git a/templates/expenses.html b/templates/expenses.html index af6b3b5..8ee8735 100644 --- a/templates/expenses.html +++ b/templates/expenses.html @@ -19,11 +19,23 @@

{{ username }}'s Expenses

- +
+ + +
+ {% for connection in connections %} + + {% endfor %} +
+
diff --git a/templates/pluggy_connect_widget.html b/templates/pluggy_connect_widget.html index 964c4a2..d7d676f 100644 --- a/templates/pluggy_connect_widget.html +++ b/templates/pluggy_connect_widget.html @@ -11,7 +11,10 @@ headers: { 'Content-Type': 'application/json', }, - body: JSON.stringify({ item_id: itemData.item.id }) + body: JSON.stringify({ + item_id: itemData.item.id, + connector_name: itemData.item.connector.name + }) }) .then(response => response.json()) .then(data => {