-
Notifications
You must be signed in to change notification settings - Fork 143
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
chore(notifications): import email from kratos
- Loading branch information
1 parent
8df532e
commit 51ed6bd
Showing
5 changed files
with
46 additions
and
54 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,7 +1,7 @@ | ||
use serde::{Deserialize, Serialize}; | ||
|
||
#[derive(Clone, Default, Debug, Serialize, Deserialize)] | ||
pub struct MongoImportConfig { | ||
pub struct KratosImportConfig { | ||
pub execute_import: bool, | ||
pub connection: Option<String>, | ||
pub pg_con: Option<String>, | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,55 +1,48 @@ | ||
use anyhow::*; | ||
use futures::stream::StreamExt; | ||
use serde::Deserialize; | ||
use sqlx::{Postgres, QueryBuilder, Row}; | ||
|
||
mod config; | ||
mod mongodb; | ||
|
||
use crate::{app::NotificationsApp, primitives::*}; | ||
pub use config::*; | ||
|
||
#[derive(Debug, Deserialize)] | ||
#[serde(rename_all = "camelCase")] | ||
struct MongoUser { | ||
#[serde(default)] | ||
user_id: Option<String>, | ||
#[serde(default)] | ||
device_tokens: Vec<String>, | ||
} | ||
|
||
pub async fn import_user_notification_settings( | ||
pub async fn import_email_addresses( | ||
app: NotificationsApp, | ||
config: MongoImportConfig, | ||
config: KratosImportConfig, | ||
) -> anyhow::Result<()> { | ||
let client = mongodb::get_client(config).await?; | ||
let db = client.default_database().context("default database")?; | ||
let users = db.collection::<MongoUser>("users"); | ||
let mut cursor = users.find(None, None).await?; | ||
println!("EXECUTING EMAIL IMPORT"); | ||
let pool = sqlx::postgres::PgPoolOptions::new() | ||
.connect(&config.pg_con.expect("pg_con not set")) | ||
.await?; | ||
let mut last_email = String::new(); | ||
let mut total_users = 0; | ||
while let Some(maybe_user) = cursor.next().await { | ||
let user = match maybe_user { | ||
Err(e) => { | ||
println!("Error deserializing user: {:?}", e); | ||
continue; | ||
} | ||
core::result::Result::Ok(user) => user, | ||
}; | ||
if let Some(user_id) = user.user_id { | ||
if !user.device_tokens.is_empty() { | ||
let user_id = GaloyUserId::from(user_id); | ||
for device_token in user.device_tokens { | ||
app.add_push_device_token(user_id.clone(), PushDeviceToken::from(device_token)) | ||
.await?; | ||
} | ||
} | ||
loop { | ||
let mut query_builder: QueryBuilder<Postgres> = QueryBuilder::new( | ||
r#"SELECT id, traits->>'email' AS email | ||
FROM identities | ||
WHERE traits->>'email' IS NOT NULL | ||
AND traits->>'email' >"#, | ||
); | ||
query_builder.push_bind(&last_email); | ||
query_builder.push("ORDER BY traits->>'email' LIMIT 1000;"); | ||
let query = query_builder.build(); | ||
let res = query.fetch_all(&pool).await?; | ||
if res.len() == 0 { | ||
break; | ||
} | ||
|
||
total_users += 1; | ||
if total_users % 100 == 0 { | ||
println!("{total_users} users synced"); | ||
for row in res { | ||
let id: uuid::Uuid = row.get("id"); | ||
let email: String = row.get("email"); | ||
app.update_email_address( | ||
GaloyUserId::from(id.to_string()), | ||
GaloyEmailAddress::from(email.clone()), | ||
) | ||
.await?; | ||
total_users += 1; | ||
last_email = email; | ||
} | ||
println!("First {total_users} synced"); | ||
} | ||
println!("SYNCING FINISHED: {total_users} users sycned"); | ||
|
||
Ok(()) | ||
} |