From 67411e871aad87d86abd3f603f7680010857d684 Mon Sep 17 00:00:00 2001 From: bodymindarts Date: Fri, 16 Feb 2024 14:55:08 +0100 Subject: [PATCH 1/6] chore(core): sync email address to notifications --- core/api/src/app/authentication/email.ts | 2 ++ 1 file changed, 2 insertions(+) diff --git a/core/api/src/app/authentication/email.ts b/core/api/src/app/authentication/email.ts index 31e6f148a3..7ccad7dc85 100644 --- a/core/api/src/app/authentication/email.ts +++ b/core/api/src/app/authentication/email.ts @@ -26,6 +26,8 @@ export const addEmailToIdentity = async ({ const emailRegistrationId = await authServiceEmail.sendEmailWithCode({ email }) if (emailRegistrationId instanceof Error) return emailRegistrationId + await NotificationsService().updateEmailAddress({ userId, email }) + const user = await UsersRepository().findById(userId) if (user instanceof Error) return user From 849ad0ac240b387fcfd40691e1a8391c792ea897 Mon Sep 17 00:00:00 2001 From: bodymindarts Date: Fri, 16 Feb 2024 16:23:14 +0100 Subject: [PATCH 2/6] chore(notifications): import email from kratos --- core/notifications/notifications.yml | 6 +- core/notifications/src/cli/config.rs | 8 +-- core/notifications/src/cli/mod.rs | 11 ++- core/notifications/src/data_import/config.rs | 4 +- core/notifications/src/data_import/mod.rs | 71 +++++++++----------- 5 files changed, 46 insertions(+), 54 deletions(-) diff --git a/core/notifications/notifications.yml b/core/notifications/notifications.yml index 802aaa1f18..02bace636c 100644 --- a/core/notifications/notifications.yml +++ b/core/notifications/notifications.yml @@ -5,13 +5,13 @@ # jwks_url: "http://localhost:4456/.well-known/jwks.json" # grpc_server: # port: 6685 -# mongo_import: -# execute_import: false app: executor: fcm: - google_application_credentials_path: "./config/notifications/fake_service_account.json" + google_application_credentials_path: "./dev/config/notifications/fake_service_account.json" novu: workflows: circle_grew: circle-grew-workflow-id threshold_reached: threshold-reached-workflow-id +# kratos_import: +# execute_import: true diff --git a/core/notifications/src/cli/config.rs b/core/notifications/src/cli/config.rs index ad30f5dba6b..8a9a2e8880 100644 --- a/core/notifications/src/cli/config.rs +++ b/core/notifications/src/cli/config.rs @@ -22,7 +22,7 @@ pub struct Config { #[serde(default = "default_tracing_config")] pub tracing: TracingConfig, #[serde(default)] - pub mongo_import: MongoImportConfig, + pub kratos_import: KratosImportConfig, } fn default_tracing_config() -> TracingConfig { @@ -33,7 +33,7 @@ fn default_tracing_config() -> TracingConfig { pub struct EnvOverride { pub db_con: String, - pub mongodb_connection: Option, + pub kratos_pg_con: Option, } impl Config { @@ -41,7 +41,7 @@ impl Config { path: Option>, EnvOverride { db_con, - mongodb_connection, + kratos_pg_con, }: EnvOverride, ) -> anyhow::Result { let mut config: Config = if let Some(path) = path { @@ -52,7 +52,7 @@ impl Config { Default::default() }; config.db.pg_con = db_con; - config.mongo_import.connection = mongodb_connection; + config.kratos_import.pg_con = kratos_pg_con; config.app.executor.fcm.load_creds()?; diff --git a/core/notifications/src/cli/mod.rs b/core/notifications/src/cli/mod.rs index a23385a8cc..5dbfcfe0d1 100644 --- a/core/notifications/src/cli/mod.rs +++ b/core/notifications/src/cli/mod.rs @@ -14,8 +14,8 @@ struct Cli { config: Option, #[clap(env = "PG_CON")] pg_con: String, - #[clap(env = "MONGODB_CON")] - mongodb_connection: Option, + #[clap(env = "KRATOS_PG_CON")] + kratos_pg_con: Option, } pub async fn run() -> anyhow::Result<()> { @@ -25,7 +25,7 @@ pub async fn run() -> anyhow::Result<()> { cli.config, EnvOverride { db_con: cli.pg_con, - mongodb_connection: cli.mongodb_connection, + kratos_pg_con: cli.kratos_pg_con, }, )?; @@ -41,9 +41,8 @@ async fn run_cmd(config: Config) -> anyhow::Result<()> { let mut handles = vec![]; let pool = db::init_pool(&config.db).await?; let app = crate::app::NotificationsApp::init(pool, config.app).await?; - if config.mongo_import.execute_import && config.mongo_import.connection.is_some() { - crate::data_import::import_user_notification_settings(app.clone(), config.mongo_import) - .await?; + if config.kratos_import.execute_import && config.kratos_import.pg_con.is_some() { + crate::data_import::import_email_addresses(app.clone(), config.kratos_import).await?; } println!("Starting notifications graphql server"); diff --git a/core/notifications/src/data_import/config.rs b/core/notifications/src/data_import/config.rs index 4275c002cd..1ca6826507 100644 --- a/core/notifications/src/data_import/config.rs +++ b/core/notifications/src/data_import/config.rs @@ -1,7 +1,7 @@ use serde::{Deserialize, Serialize}; #[derive(Clone, Default, Debug, Serialize, Deserialize)] -pub struct MongoImportConfig { +pub struct KratosImportConfig { pub execute_import: bool, - pub connection: Option, + pub pg_con: Option, } diff --git a/core/notifications/src/data_import/mod.rs b/core/notifications/src/data_import/mod.rs index b8d4c21e11..bc33a02758 100644 --- a/core/notifications/src/data_import/mod.rs +++ b/core/notifications/src/data_import/mod.rs @@ -1,55 +1,48 @@ use anyhow::*; -use futures::stream::StreamExt; -use serde::Deserialize; +use sqlx::{Postgres, QueryBuilder, Row}; mod config; -mod mongodb; use crate::{app::NotificationsApp, primitives::*}; pub use config::*; -#[derive(Debug, Deserialize)] -#[serde(rename_all = "camelCase")] -struct MongoUser { - #[serde(default)] - user_id: Option, - #[serde(default)] - device_tokens: Vec, -} - -pub async fn import_user_notification_settings( +pub async fn import_email_addresses( app: NotificationsApp, - config: MongoImportConfig, + config: KratosImportConfig, ) -> anyhow::Result<()> { - let client = mongodb::get_client(config).await?; - let db = client.default_database().context("default database")?; - let users = db.collection::("users"); - let mut cursor = users.find(None, None).await?; + println!("EXECUTING EMAIL IMPORT"); + let pool = sqlx::postgres::PgPoolOptions::new() + .connect(&config.pg_con.expect("pg_con not set")) + .await?; + let mut last_email = String::new(); let mut total_users = 0; - while let Some(maybe_user) = cursor.next().await { - let user = match maybe_user { - Err(e) => { - println!("Error deserializing user: {:?}", e); - continue; - } - core::result::Result::Ok(user) => user, - }; - if let Some(user_id) = user.user_id { - if !user.device_tokens.is_empty() { - let user_id = GaloyUserId::from(user_id); - for device_token in user.device_tokens { - app.add_push_device_token(user_id.clone(), PushDeviceToken::from(device_token)) - .await?; - } - } + loop { + let mut query_builder: QueryBuilder = QueryBuilder::new( + r#"SELECT id, traits->>'email' AS email + FROM identities + WHERE traits->>'email' IS NOT NULL + AND traits->>'email' >"#, + ); + query_builder.push_bind(&last_email); + query_builder.push("ORDER BY traits->>'email' LIMIT 1000;"); + let query = query_builder.build(); + let res = query.fetch_all(&pool).await?; + if res.len() == 0 { + break; } - - total_users += 1; - if total_users % 100 == 0 { - println!("{total_users} users synced"); + for row in res { + let id: uuid::Uuid = row.get("id"); + let email: String = row.get("email"); + app.update_email_address( + GaloyUserId::from(id.to_string()), + GaloyEmailAddress::from(email.clone()), + ) + .await?; + total_users += 1; + last_email = email; } + println!("First {total_users} synced"); } println!("SYNCING FINISHED: {total_users} users sycned"); - Ok(()) } From 748960dc2e7a091dc9f716dec514e7f08259c908 Mon Sep 17 00:00:00 2001 From: bodymindarts Date: Fri, 16 Feb 2024 16:25:21 +0100 Subject: [PATCH 3/6] chore(notifications): remove redundant novu config --- core/notifications/notifications.yml | 4 ---- 1 file changed, 4 deletions(-) diff --git a/core/notifications/notifications.yml b/core/notifications/notifications.yml index 02bace636c..c6fb5e01c2 100644 --- a/core/notifications/notifications.yml +++ b/core/notifications/notifications.yml @@ -9,9 +9,5 @@ app: executor: fcm: google_application_credentials_path: "./dev/config/notifications/fake_service_account.json" - novu: - workflows: - circle_grew: circle-grew-workflow-id - threshold_reached: threshold-reached-workflow-id # kratos_import: # execute_import: true From 2655e62ac1788b67e9aeea3b900f2253651cbddf Mon Sep 17 00:00:00 2001 From: Sam Peters Date: Fri, 16 Feb 2024 11:28:03 -0600 Subject: [PATCH 4/6] chore(core): update email after verified --- core/api/src/app/authentication/email.ts | 2 -- 1 file changed, 2 deletions(-) diff --git a/core/api/src/app/authentication/email.ts b/core/api/src/app/authentication/email.ts index 7ccad7dc85..31e6f148a3 100644 --- a/core/api/src/app/authentication/email.ts +++ b/core/api/src/app/authentication/email.ts @@ -26,8 +26,6 @@ export const addEmailToIdentity = async ({ const emailRegistrationId = await authServiceEmail.sendEmailWithCode({ email }) if (emailRegistrationId instanceof Error) return emailRegistrationId - await NotificationsService().updateEmailAddress({ userId, email }) - const user = await UsersRepository().findById(userId) if (user instanceof Error) return user From a4e77cc884e24357e9a77c7296d651fc5d1ee7cf Mon Sep 17 00:00:00 2001 From: Sam Peters Date: Fri, 16 Feb 2024 11:28:54 -0600 Subject: [PATCH 5/6] chore(notifications): fix linting --- core/notifications/src/data_import/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/notifications/src/data_import/mod.rs b/core/notifications/src/data_import/mod.rs index bc33a02758..6141c08ac4 100644 --- a/core/notifications/src/data_import/mod.rs +++ b/core/notifications/src/data_import/mod.rs @@ -27,7 +27,7 @@ pub async fn import_email_addresses( query_builder.push("ORDER BY traits->>'email' LIMIT 1000;"); let query = query_builder.build(); let res = query.fetch_all(&pool).await?; - if res.len() == 0 { + if res.is_empty() { break; } for row in res { From 5e6c886597a06972c42726bac402fc1396201951 Mon Sep 17 00:00:00 2001 From: Vaibhav Date: Sat, 17 Feb 2024 16:00:49 +0530 Subject: [PATCH 6/6] chore(notifications): use correct path for google application creds --- core/notifications/notifications.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/notifications/notifications.yml b/core/notifications/notifications.yml index c6fb5e01c2..6c0dda80eb 100644 --- a/core/notifications/notifications.yml +++ b/core/notifications/notifications.yml @@ -8,6 +8,6 @@ app: executor: fcm: - google_application_credentials_path: "./dev/config/notifications/fake_service_account.json" + google_application_credentials_path: "./config/notifications/fake_service_account.json" # kratos_import: # execute_import: true