Skip to content

Commit

Permalink
chore(notifications): import kratos email (#4012)
Browse files Browse the repository at this point in the history
* chore(core): sync email address to notifications

* chore(notifications): import email from kratos

* chore(notifications): remove redundant novu config

* chore(core): update email after verified

* chore(notifications): fix linting

* chore(notifications): use correct path for google application creds

---------

Co-authored-by: Sam Peters <[email protected]>
Co-authored-by: Vaibhav <[email protected]>
  • Loading branch information
3 people authored Feb 19, 2024
1 parent 43fa9c1 commit f67f2c6
Show file tree
Hide file tree
Showing 5 changed files with 45 additions and 57 deletions.
8 changes: 2 additions & 6 deletions core/notifications/notifications.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,9 @@
# jwks_url: "http://localhost:4456/.well-known/jwks.json"
# grpc_server:
# port: 6685
# mongo_import:
# execute_import: false
app:
executor:
fcm:
google_application_credentials_path: "./config/notifications/fake_service_account.json"
novu:
workflows:
circle_grew: circle-grew-workflow-id
threshold_reached: threshold-reached-workflow-id
# kratos_import:
# execute_import: true
8 changes: 4 additions & 4 deletions core/notifications/src/cli/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ pub struct Config {
#[serde(default = "default_tracing_config")]
pub tracing: TracingConfig,
#[serde(default)]
pub mongo_import: MongoImportConfig,
pub kratos_import: KratosImportConfig,
}

fn default_tracing_config() -> TracingConfig {
Expand All @@ -33,15 +33,15 @@ fn default_tracing_config() -> TracingConfig {

pub struct EnvOverride {
pub db_con: String,
pub mongodb_connection: Option<String>,
pub kratos_pg_con: Option<String>,
}

impl Config {
pub fn from_path(
path: Option<impl AsRef<Path>>,
EnvOverride {
db_con,
mongodb_connection,
kratos_pg_con,
}: EnvOverride,
) -> anyhow::Result<Self> {
let mut config: Config = if let Some(path) = path {
Expand All @@ -52,7 +52,7 @@ impl Config {
Default::default()
};
config.db.pg_con = db_con;
config.mongo_import.connection = mongodb_connection;
config.kratos_import.pg_con = kratos_pg_con;

config.app.executor.fcm.load_creds()?;

Expand Down
11 changes: 5 additions & 6 deletions core/notifications/src/cli/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ struct Cli {
config: Option<PathBuf>,
#[clap(env = "PG_CON")]
pg_con: String,
#[clap(env = "MONGODB_CON")]
mongodb_connection: Option<String>,
#[clap(env = "KRATOS_PG_CON")]
kratos_pg_con: Option<String>,
}

pub async fn run() -> anyhow::Result<()> {
Expand All @@ -25,7 +25,7 @@ pub async fn run() -> anyhow::Result<()> {
cli.config,
EnvOverride {
db_con: cli.pg_con,
mongodb_connection: cli.mongodb_connection,
kratos_pg_con: cli.kratos_pg_con,
},
)?;

Expand All @@ -41,9 +41,8 @@ async fn run_cmd(config: Config) -> anyhow::Result<()> {
let mut handles = vec![];
let pool = db::init_pool(&config.db).await?;
let app = crate::app::NotificationsApp::init(pool, config.app).await?;
if config.mongo_import.execute_import && config.mongo_import.connection.is_some() {
crate::data_import::import_user_notification_settings(app.clone(), config.mongo_import)
.await?;
if config.kratos_import.execute_import && config.kratos_import.pg_con.is_some() {
crate::data_import::import_email_addresses(app.clone(), config.kratos_import).await?;
}

println!("Starting notifications graphql server");
Expand Down
4 changes: 2 additions & 2 deletions core/notifications/src/data_import/config.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use serde::{Deserialize, Serialize};

#[derive(Clone, Default, Debug, Serialize, Deserialize)]
pub struct MongoImportConfig {
pub struct KratosImportConfig {
pub execute_import: bool,
pub connection: Option<String>,
pub pg_con: Option<String>,
}
71 changes: 32 additions & 39 deletions core/notifications/src/data_import/mod.rs
Original file line number Diff line number Diff line change
@@ -1,55 +1,48 @@
use anyhow::*;
use futures::stream::StreamExt;
use serde::Deserialize;
use sqlx::{Postgres, QueryBuilder, Row};

mod config;
mod mongodb;

use crate::{app::NotificationsApp, primitives::*};
pub use config::*;

#[derive(Debug, Deserialize)]
#[serde(rename_all = "camelCase")]
struct MongoUser {
#[serde(default)]
user_id: Option<String>,
#[serde(default)]
device_tokens: Vec<String>,
}

pub async fn import_user_notification_settings(
pub async fn import_email_addresses(
app: NotificationsApp,
config: MongoImportConfig,
config: KratosImportConfig,
) -> anyhow::Result<()> {
let client = mongodb::get_client(config).await?;
let db = client.default_database().context("default database")?;
let users = db.collection::<MongoUser>("users");
let mut cursor = users.find(None, None).await?;
println!("EXECUTING EMAIL IMPORT");
let pool = sqlx::postgres::PgPoolOptions::new()
.connect(&config.pg_con.expect("pg_con not set"))
.await?;
let mut last_email = String::new();
let mut total_users = 0;
while let Some(maybe_user) = cursor.next().await {
let user = match maybe_user {
Err(e) => {
println!("Error deserializing user: {:?}", e);
continue;
}
core::result::Result::Ok(user) => user,
};
if let Some(user_id) = user.user_id {
if !user.device_tokens.is_empty() {
let user_id = GaloyUserId::from(user_id);
for device_token in user.device_tokens {
app.add_push_device_token(user_id.clone(), PushDeviceToken::from(device_token))
.await?;
}
}
loop {
let mut query_builder: QueryBuilder<Postgres> = QueryBuilder::new(
r#"SELECT id, traits->>'email' AS email
FROM identities
WHERE traits->>'email' IS NOT NULL
AND traits->>'email' >"#,
);
query_builder.push_bind(&last_email);
query_builder.push("ORDER BY traits->>'email' LIMIT 1000;");
let query = query_builder.build();
let res = query.fetch_all(&pool).await?;
if res.is_empty() {
break;
}

total_users += 1;
if total_users % 100 == 0 {
println!("{total_users} users synced");
for row in res {
let id: uuid::Uuid = row.get("id");
let email: String = row.get("email");
app.update_email_address(
GaloyUserId::from(id.to_string()),
GaloyEmailAddress::from(email.clone()),
)
.await?;
total_users += 1;
last_email = email;
}
println!("First {total_users} synced");
}
println!("SYNCING FINISHED: {total_users} users sycned");

Ok(())
}

0 comments on commit f67f2c6

Please sign in to comment.