Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore(notifications): import kratos email #4012

Merged
merged 6 commits into from
Feb 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 2 additions & 6 deletions core/notifications/notifications.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,9 @@
# jwks_url: "http://localhost:4456/.well-known/jwks.json"
# grpc_server:
# port: 6685
# mongo_import:
# execute_import: false
app:
executor:
fcm:
google_application_credentials_path: "./config/notifications/fake_service_account.json"
novu:
workflows:
circle_grew: circle-grew-workflow-id
threshold_reached: threshold-reached-workflow-id
# kratos_import:
# execute_import: true
8 changes: 4 additions & 4 deletions core/notifications/src/cli/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ pub struct Config {
#[serde(default = "default_tracing_config")]
pub tracing: TracingConfig,
#[serde(default)]
pub mongo_import: MongoImportConfig,
pub kratos_import: KratosImportConfig,
}

fn default_tracing_config() -> TracingConfig {
Expand All @@ -33,15 +33,15 @@ fn default_tracing_config() -> TracingConfig {

pub struct EnvOverride {
pub db_con: String,
pub mongodb_connection: Option<String>,
pub kratos_pg_con: Option<String>,
}

impl Config {
pub fn from_path(
path: Option<impl AsRef<Path>>,
EnvOverride {
db_con,
mongodb_connection,
kratos_pg_con,
}: EnvOverride,
) -> anyhow::Result<Self> {
let mut config: Config = if let Some(path) = path {
Expand All @@ -52,7 +52,7 @@ impl Config {
Default::default()
};
config.db.pg_con = db_con;
config.mongo_import.connection = mongodb_connection;
config.kratos_import.pg_con = kratos_pg_con;

config.app.executor.fcm.load_creds()?;

Expand Down
11 changes: 5 additions & 6 deletions core/notifications/src/cli/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ struct Cli {
config: Option<PathBuf>,
#[clap(env = "PG_CON")]
pg_con: String,
#[clap(env = "MONGODB_CON")]
mongodb_connection: Option<String>,
#[clap(env = "KRATOS_PG_CON")]
kratos_pg_con: Option<String>,
}

pub async fn run() -> anyhow::Result<()> {
Expand All @@ -25,7 +25,7 @@ pub async fn run() -> anyhow::Result<()> {
cli.config,
EnvOverride {
db_con: cli.pg_con,
mongodb_connection: cli.mongodb_connection,
kratos_pg_con: cli.kratos_pg_con,
},
)?;

Expand All @@ -41,9 +41,8 @@ async fn run_cmd(config: Config) -> anyhow::Result<()> {
let mut handles = vec![];
let pool = db::init_pool(&config.db).await?;
let app = crate::app::NotificationsApp::init(pool, config.app).await?;
if config.mongo_import.execute_import && config.mongo_import.connection.is_some() {
crate::data_import::import_user_notification_settings(app.clone(), config.mongo_import)
.await?;
if config.kratos_import.execute_import && config.kratos_import.pg_con.is_some() {
crate::data_import::import_email_addresses(app.clone(), config.kratos_import).await?;
}

println!("Starting notifications graphql server");
Expand Down
4 changes: 2 additions & 2 deletions core/notifications/src/data_import/config.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use serde::{Deserialize, Serialize};

#[derive(Clone, Default, Debug, Serialize, Deserialize)]
pub struct MongoImportConfig {
pub struct KratosImportConfig {
pub execute_import: bool,
pub connection: Option<String>,
pub pg_con: Option<String>,
}
71 changes: 32 additions & 39 deletions core/notifications/src/data_import/mod.rs
Original file line number Diff line number Diff line change
@@ -1,55 +1,48 @@
use anyhow::*;
use futures::stream::StreamExt;
use serde::Deserialize;
use sqlx::{Postgres, QueryBuilder, Row};

mod config;
mod mongodb;

use crate::{app::NotificationsApp, primitives::*};
pub use config::*;

#[derive(Debug, Deserialize)]
#[serde(rename_all = "camelCase")]
struct MongoUser {
#[serde(default)]
user_id: Option<String>,
#[serde(default)]
device_tokens: Vec<String>,
}

pub async fn import_user_notification_settings(
pub async fn import_email_addresses(
app: NotificationsApp,
config: MongoImportConfig,
config: KratosImportConfig,
) -> anyhow::Result<()> {
let client = mongodb::get_client(config).await?;
let db = client.default_database().context("default database")?;
let users = db.collection::<MongoUser>("users");
let mut cursor = users.find(None, None).await?;
println!("EXECUTING EMAIL IMPORT");
let pool = sqlx::postgres::PgPoolOptions::new()
.connect(&config.pg_con.expect("pg_con not set"))
.await?;
let mut last_email = String::new();
let mut total_users = 0;
while let Some(maybe_user) = cursor.next().await {
let user = match maybe_user {
Err(e) => {
println!("Error deserializing user: {:?}", e);
continue;
}
core::result::Result::Ok(user) => user,
};
if let Some(user_id) = user.user_id {
if !user.device_tokens.is_empty() {
let user_id = GaloyUserId::from(user_id);
for device_token in user.device_tokens {
app.add_push_device_token(user_id.clone(), PushDeviceToken::from(device_token))
.await?;
}
}
loop {
let mut query_builder: QueryBuilder<Postgres> = QueryBuilder::new(
r#"SELECT id, traits->>'email' AS email
FROM identities
WHERE traits->>'email' IS NOT NULL
AND traits->>'email' >"#,
);
query_builder.push_bind(&last_email);
query_builder.push("ORDER BY traits->>'email' LIMIT 1000;");
let query = query_builder.build();
let res = query.fetch_all(&pool).await?;
if res.is_empty() {
break;
}

total_users += 1;
if total_users % 100 == 0 {
println!("{total_users} users synced");
for row in res {
let id: uuid::Uuid = row.get("id");
let email: String = row.get("email");
app.update_email_address(
GaloyUserId::from(id.to_string()),
GaloyEmailAddress::from(email.clone()),
)
.await?;
total_users += 1;
last_email = email;
}
println!("First {total_users} synced");
}
println!("SYNCING FINISHED: {total_users} users sycned");

Ok(())
}
Loading