diff --git a/core/notifications/notifications.yml b/core/notifications/notifications.yml index 802aaa1f18..6c0dda80eb 100644 --- a/core/notifications/notifications.yml +++ b/core/notifications/notifications.yml @@ -5,13 +5,9 @@ # jwks_url: "http://localhost:4456/.well-known/jwks.json" # grpc_server: # port: 6685 -# mongo_import: -# execute_import: false app: executor: fcm: google_application_credentials_path: "./config/notifications/fake_service_account.json" - novu: - workflows: - circle_grew: circle-grew-workflow-id - threshold_reached: threshold-reached-workflow-id +# kratos_import: +# execute_import: true diff --git a/core/notifications/src/cli/config.rs b/core/notifications/src/cli/config.rs index ad30f5dba6b..8a9a2e8880 100644 --- a/core/notifications/src/cli/config.rs +++ b/core/notifications/src/cli/config.rs @@ -22,7 +22,7 @@ pub struct Config { #[serde(default = "default_tracing_config")] pub tracing: TracingConfig, #[serde(default)] - pub mongo_import: MongoImportConfig, + pub kratos_import: KratosImportConfig, } fn default_tracing_config() -> TracingConfig { @@ -33,7 +33,7 @@ fn default_tracing_config() -> TracingConfig { pub struct EnvOverride { pub db_con: String, - pub mongodb_connection: Option, + pub kratos_pg_con: Option, } impl Config { @@ -41,7 +41,7 @@ impl Config { path: Option>, EnvOverride { db_con, - mongodb_connection, + kratos_pg_con, }: EnvOverride, ) -> anyhow::Result { let mut config: Config = if let Some(path) = path { @@ -52,7 +52,7 @@ impl Config { Default::default() }; config.db.pg_con = db_con; - config.mongo_import.connection = mongodb_connection; + config.kratos_import.pg_con = kratos_pg_con; config.app.executor.fcm.load_creds()?; diff --git a/core/notifications/src/cli/mod.rs b/core/notifications/src/cli/mod.rs index a23385a8cc..5dbfcfe0d1 100644 --- a/core/notifications/src/cli/mod.rs +++ b/core/notifications/src/cli/mod.rs @@ -14,8 +14,8 @@ struct Cli { config: Option, #[clap(env = "PG_CON")] pg_con: String, - #[clap(env = "MONGODB_CON")] - mongodb_connection: Option, + #[clap(env = "KRATOS_PG_CON")] + kratos_pg_con: Option, } pub async fn run() -> anyhow::Result<()> { @@ -25,7 +25,7 @@ pub async fn run() -> anyhow::Result<()> { cli.config, EnvOverride { db_con: cli.pg_con, - mongodb_connection: cli.mongodb_connection, + kratos_pg_con: cli.kratos_pg_con, }, )?; @@ -41,9 +41,8 @@ async fn run_cmd(config: Config) -> anyhow::Result<()> { let mut handles = vec![]; let pool = db::init_pool(&config.db).await?; let app = crate::app::NotificationsApp::init(pool, config.app).await?; - if config.mongo_import.execute_import && config.mongo_import.connection.is_some() { - crate::data_import::import_user_notification_settings(app.clone(), config.mongo_import) - .await?; + if config.kratos_import.execute_import && config.kratos_import.pg_con.is_some() { + crate::data_import::import_email_addresses(app.clone(), config.kratos_import).await?; } println!("Starting notifications graphql server"); diff --git a/core/notifications/src/data_import/config.rs b/core/notifications/src/data_import/config.rs index 4275c002cd..1ca6826507 100644 --- a/core/notifications/src/data_import/config.rs +++ b/core/notifications/src/data_import/config.rs @@ -1,7 +1,7 @@ use serde::{Deserialize, Serialize}; #[derive(Clone, Default, Debug, Serialize, Deserialize)] -pub struct MongoImportConfig { +pub struct KratosImportConfig { pub execute_import: bool, - pub connection: Option, + pub pg_con: Option, } diff --git a/core/notifications/src/data_import/mod.rs b/core/notifications/src/data_import/mod.rs index b8d4c21e11..6141c08ac4 100644 --- a/core/notifications/src/data_import/mod.rs +++ b/core/notifications/src/data_import/mod.rs @@ -1,55 +1,48 @@ use anyhow::*; -use futures::stream::StreamExt; -use serde::Deserialize; +use sqlx::{Postgres, QueryBuilder, Row}; mod config; -mod mongodb; use crate::{app::NotificationsApp, primitives::*}; pub use config::*; -#[derive(Debug, Deserialize)] -#[serde(rename_all = "camelCase")] -struct MongoUser { - #[serde(default)] - user_id: Option, - #[serde(default)] - device_tokens: Vec, -} - -pub async fn import_user_notification_settings( +pub async fn import_email_addresses( app: NotificationsApp, - config: MongoImportConfig, + config: KratosImportConfig, ) -> anyhow::Result<()> { - let client = mongodb::get_client(config).await?; - let db = client.default_database().context("default database")?; - let users = db.collection::("users"); - let mut cursor = users.find(None, None).await?; + println!("EXECUTING EMAIL IMPORT"); + let pool = sqlx::postgres::PgPoolOptions::new() + .connect(&config.pg_con.expect("pg_con not set")) + .await?; + let mut last_email = String::new(); let mut total_users = 0; - while let Some(maybe_user) = cursor.next().await { - let user = match maybe_user { - Err(e) => { - println!("Error deserializing user: {:?}", e); - continue; - } - core::result::Result::Ok(user) => user, - }; - if let Some(user_id) = user.user_id { - if !user.device_tokens.is_empty() { - let user_id = GaloyUserId::from(user_id); - for device_token in user.device_tokens { - app.add_push_device_token(user_id.clone(), PushDeviceToken::from(device_token)) - .await?; - } - } + loop { + let mut query_builder: QueryBuilder = QueryBuilder::new( + r#"SELECT id, traits->>'email' AS email + FROM identities + WHERE traits->>'email' IS NOT NULL + AND traits->>'email' >"#, + ); + query_builder.push_bind(&last_email); + query_builder.push("ORDER BY traits->>'email' LIMIT 1000;"); + let query = query_builder.build(); + let res = query.fetch_all(&pool).await?; + if res.is_empty() { + break; } - - total_users += 1; - if total_users % 100 == 0 { - println!("{total_users} users synced"); + for row in res { + let id: uuid::Uuid = row.get("id"); + let email: String = row.get("email"); + app.update_email_address( + GaloyUserId::from(id.to_string()), + GaloyEmailAddress::from(email.clone()), + ) + .await?; + total_users += 1; + last_email = email; } + println!("First {total_users} synced"); } println!("SYNCING FINISHED: {total_users} users sycned"); - Ok(()) }