From e81b8dbe9cacbda6dab144b6fb11c7b50ba3c9b8 Mon Sep 17 00:00:00 2001 From: Daniel Cadenas Date: Tue, 15 Oct 2024 10:40:36 -0300 Subject: [PATCH 1/2] Add vanish_requests stream subscriber --- Cargo.lock | 69 ++++++++++++++++ Cargo.toml | 1 + compose.yaml | 4 - src/config.rs | 1 + src/lib.rs | 1 + src/main.rs | 13 +++ src/repo.rs | 49 ++++++++++- src/vanish_subscriber.rs | 172 +++++++++++++++++++++++++++++++++++++++ 8 files changed, 305 insertions(+), 5 deletions(-) create mode 100644 src/vanish_subscriber.rs diff --git a/Cargo.lock b/Cargo.lock index 00adb33..8a22ef9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -141,6 +141,12 @@ version = "1.0.87" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "10f00e1f6e58a40e807377c75c6a7f97bf9044fab57816f2414e6f5f4499d7b8" +[[package]] +name = "arc-swap" +version = "1.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "69f7f8c3906b62b754cd5326047894316021dcfe5a194c8ea52bdd94934a3457" + [[package]] name = "assertables" version = "8.3.0" @@ -712,6 +718,20 @@ version = "1.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d3fd119d74b830634cea2a0f58bbd0d54540518a14397557951e79340abc28c0" +[[package]] +name = "combine" +version = "4.6.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ba5a308b75df32fe02788e748662718f03fde005016435c444eea572398219fd" +dependencies = [ + "bytes", + "futures-core", + "memchr", + "pin-project-lite", + "tokio", + "tokio-util", +] + [[package]] name = "concurrent-queue" version = "2.5.0" @@ -1965,6 +1985,7 @@ dependencies = [ "nostr-sdk", "ordermap", "pretty_assertions", + "redis", "rustls", "serde", "serde_json", @@ -2653,6 +2674,37 @@ dependencies = [ "bitflags", ] +[[package]] +name = "redis" +version = "0.27.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dc6baebe319ef5e4b470f248335620098d1c2e9261e995be05f56f719ca4bdb2" +dependencies = [ + "arc-swap", + "async-trait", + "bytes", + "combine", + "futures", + "futures-util", + "itoa", + "num-bigint", + "percent-encoding", + "pin-project-lite", + "rustls", + "rustls-native-certs", + "rustls-pemfile", + "rustls-pki-types", + "ryu", + "sha1_smol", + "socket2", + "tokio", + "tokio-retry2", + "tokio-rustls", + "tokio-util", + "url", + "webpki-roots", +] + [[package]] name = "redox_syscall" version = "0.5.3" @@ -3086,6 +3138,12 @@ dependencies = [ "digest", ] +[[package]] +name = "sha1_smol" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bbfa15b3dddfee50a0fff136974b3e1bde555604ba463834a7eb7deb6417705d" + [[package]] name = "sha2" version = "0.10.8" @@ -3380,6 +3438,17 @@ dependencies = [ "tokio", ] +[[package]] +name = "tokio-retry2" +version = "0.5.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "903934dba1c4c2f2e9cb460ef10b5695e0b0ecad3bf9ee7c8675e540c5e8b2d1" +dependencies = [ + "pin-project 1.1.5", + "rand", + "tokio", +] + [[package]] name = "tokio-rustls" version = "0.26.0" diff --git a/Cargo.toml b/Cargo.toml index f326cff..5d96c6f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -28,6 +28,7 @@ neo4rs = "0.8.0" nonzero_ext = "0.3.0" nostr-sdk = "0.33.0" ordermap = "0.5.3" +redis = { version = "0.27.4", features = ["connection-manager", "tls-rustls", "tls-rustls-webpki-roots", "tokio-rustls-comp"] } rustls = { version = "0.23.12", features = ["ring"] } serde = { version = "1.0.209", features = ["derive"] } serde_json = "1.0.128" diff --git a/compose.yaml b/compose.yaml index f038fe3..55acda4 100644 --- a/compose.yaml +++ b/compose.yaml @@ -5,10 +5,6 @@ services: context: . target: final environment: - - APP__followers__relay=ws://relay:7777 - - APP__followers__neo4j_uri=db:7687 - - APP__followers__neo4j_user=neo4j - - APP__followers__neo4j_password=mydevpassword - APP__ENVIRONMENT=development - GOOGLE_APPLICATION_CREDENTIALS=/app/gcloud/application_default_credentials.json - RUST_LOG=nos_followers=debug diff --git a/src/config.rs b/src/config.rs index 2140b25..2b24566 100644 --- a/src/config.rs +++ b/src/config.rs @@ -25,6 +25,7 @@ pub struct Settings { pub pagerank_cron_expression: String, pub http_cache_seconds: u32, pub burst: NonZeroU16, + pub redis_url: String, } impl Configurable for Settings { diff --git a/src/lib.rs b/src/lib.rs index 496011b..7070414 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -11,4 +11,5 @@ pub mod relay_subscriber; pub mod repo; pub mod scheduler; pub mod tcp_importer; +pub mod vanish_subscriber; pub mod worker_pool; diff --git a/src/main.rs b/src/main.rs index 66217c4..36eec37 100644 --- a/src/main.rs +++ b/src/main.rs @@ -10,6 +10,7 @@ use nos_followers::{ repo::{Repo, RepoTrait}, scheduler::start_scheduler, tcp_importer::start_tcp_importer, + vanish_subscriber::{start_vanish_subscriber, RedisClient}, worker_pool::WorkerPool, }; use nostr_sdk::prelude::*; @@ -153,6 +154,18 @@ async fn start_server(settings: Settings) -> Result<()> { .await .context("Failed starting the scheduler")?; + // TODO: Now that we have redis we would use it to restore pending + // notifications between restarts and integrate it with cached crate + let redis_client = RedisClient::new(&settings.redis_url); + + info!("Starting vanish subscriber"); + start_vanish_subscriber( + task_tracker.clone(), + redis_client, + repo.clone(), + cancellation_token.clone(), + ); + tokio::spawn(async move { if let Err(e) = cancel_on_stop_signals(cancellation_token).await { error!("Failed to listen stop signals: {}", e); diff --git a/src/repo.rs b/src/repo.rs index 5553a1d..4c1d796 100644 --- a/src/repo.rs +++ b/src/repo.rs @@ -18,7 +18,7 @@ impl Repo { } // Default trait raises not implemented just to ease testing -pub trait RepoTrait: Sync + Send { +pub trait RepoTrait: Sync + Send + 'static { /// Set the last contact list date seen for a user if it's newer than the stored value fn update_last_contact_list_at( &self, @@ -107,6 +107,13 @@ pub trait RepoTrait: Sync + Send { { async { panic!("Not implemented") } } + + fn remove_pubkey( + &self, + _public_key: &PublicKey, + ) -> impl std::future::Future> + std::marker::Send { + async { panic!("Not implemented") } + } } impl RepoTrait for Repo { @@ -571,6 +578,43 @@ impl RepoTrait for Repo { Err(e) => Err(RepoError::General(e)), } } + + async fn remove_pubkey(&self, public_key: &PublicKey) -> Result<(), RepoError> { + let statement = r#" + MATCH (user:User {pubkey: $pubkey_val}) + + // Decrement follower_count of followees + OPTIONAL MATCH (user)-[:FOLLOWS]->(followee:User) + FOREACH (f IN CASE WHEN followee IS NOT NULL THEN [followee] ELSE [] END | + SET f.follower_count = CASE + WHEN f.follower_count > 0 THEN f.follower_count - 1 + ELSE 0 + END + ) + + // Decrement followee_count of followers + WITH user + OPTIONAL MATCH (follower:User)-[:FOLLOWS]->(user) + FOREACH (f IN CASE WHEN follower IS NOT NULL THEN [follower] ELSE [] END | + SET f.followee_count = CASE + WHEN f.followee_count > 0 THEN f.followee_count - 1 + ELSE 0 + END + ) + + WITH user + DETACH DELETE user + "#; + + let query = query(statement).param("pubkey_val", public_key.to_hex()); + + self.graph + .run(query) + .await + .map_err(RepoError::RemovePubkey)?; + + Ok(()) + } } /// A function to read as DateTime a value stored either as LocalDatetime or DateTime @@ -640,6 +684,9 @@ pub enum RepoError { #[error("Failed to get pagerank: {0}")] GetPageRank(neo4rs::Error), + + #[error("Failed to remove pubkey: {0}")] + RemovePubkey(neo4rs::Error), } impl RepoError { diff --git a/src/vanish_subscriber.rs b/src/vanish_subscriber.rs new file mode 100644 index 0000000..e6a8e2c --- /dev/null +++ b/src/vanish_subscriber.rs @@ -0,0 +1,172 @@ +use crate::repo::RepoTrait; +use async_trait::async_trait; +use nostr_sdk::prelude::PublicKey; +use redis::{ + aio::ConnectionManager, + streams::{StreamKey, StreamReadOptions, StreamReadReply}, + AsyncCommands, RedisError, +}; +use std::error::Error; +use std::sync::Arc; +use tokio_util::{sync::CancellationToken, task::TaskTracker}; +use tracing::{error, info}; + +static BLOCK_MILLIS: usize = 5000; +static VANISH_STREAM_KEY: &str = "vanish_requests"; +static VANISH_LAST_ID_KEY: &str = "vanish_requests:followers_subscriber:last_id"; + +pub struct RedisClient { + client: redis::Client, +} + +#[async_trait] +pub trait RedisClientTrait: Send + Sync + 'static { + type Connection: RedisClientConnectionTrait; + async fn get_connection(&self) -> Result; +} + +impl RedisClient { + pub fn new(url: &str) -> Self { + let client = redis::Client::open(url).expect("Failed to create Redis client"); + RedisClient { client } + } +} + +#[async_trait] +impl RedisClientTrait for RedisClient { + type Connection = RedisClientConnection; + async fn get_connection(&self) -> Result { + let con = self.client.get_connection_manager().await?; + Ok(RedisClientConnection { con }) + } +} + +pub struct RedisClientConnection { + con: ConnectionManager, +} + +#[async_trait] +pub trait RedisClientConnectionTrait: Send + Sync + 'static { + async fn get(&mut self, key: &str) -> Result; + async fn set(&mut self, key: &str, value: String) -> Result<(), RedisError>; + async fn xread_options( + &mut self, + keys: &[&str], + ids: &[String], + opts: &StreamReadOptions, + ) -> Result; +} + +#[async_trait] +impl RedisClientConnectionTrait for RedisClientConnection { + async fn get(&mut self, key: &str) -> Result { + match self.con.get(key).await { + Ok(value) => Ok(value), + Err(_) => self.con.get(key).await, + } + } + + async fn set(&mut self, key: &str, value: String) -> Result<(), RedisError> { + match self.con.set(key, value.clone()).await { + Ok(()) => Ok(()), + Err(_) => self.con.set(key, value).await, + } + } + + async fn xread_options( + &mut self, + keys: &[&str], + ids: &[String], + opts: &StreamReadOptions, + ) -> Result { + match self.con.xread_options(keys, ids, opts).await { + Ok(reply) => Ok(reply), + Err(_) => self.con.xread_options(keys, ids, opts).await, + } + } +} + +pub fn start_vanish_subscriber( + tracker: TaskTracker, + redis_client: T, + repo: Arc, + cancellation_token: CancellationToken, +) { + tracker.spawn(async move { + let (mut con, mut last_id) = match get_connection_and_last_id(redis_client).await { + Ok(result) => result, + Err(e) => { + error!("Failed to get Redis connection: {}", e); + return; + } + }; + + let opts = StreamReadOptions::default().block(BLOCK_MILLIS); + + info!("Starting from last id processed: {}", last_id); + + loop { + tokio::select! { + _ = cancellation_token.cancelled() => { + break; + } + + result = async { + let reply: StreamReadReply = con + .xread_options(&[VANISH_STREAM_KEY], &[last_id.clone()], &opts) + .await?; + + for StreamKey { ids, .. } in reply.keys { + for stream_id in ids { + if stream_id.id == last_id { + continue; + } + + let Some(value) = stream_id.map.get("pubkey") else { + error!("Vanish request doesn't have a public key"); + continue; + }; + + let public_key = match value { + redis::Value::BulkString(bytes) => { + let public_key_string = String::from_utf8(bytes.clone())?; + + PublicKey::from_hex(public_key_string)? + } + _ => { + error!("Vanish request public key is not a bulk string"); + continue; + } + }; + + if let Err(e) = repo.remove_pubkey(&public_key).await { + error!("Failed to remove public key: {}", e); + } + + info!("Removed public key {} from vanish request", public_key.to_hex()); + + last_id = stream_id.id.clone(); + } + } + Ok::<(), Box>(()) + } => { + if let Err(e) = result { + error!("Error in Redis stream reader task: {}", e); + continue; + } + } + } + } + }); +} + +async fn get_connection_and_last_id( + redis_client: T, +) -> Result<(T::Connection, String), RedisError> { + let mut con = redis_client.get_connection().await?; + let last_id = con + .get(&VANISH_LAST_ID_KEY) + .await + .unwrap_or_else(|_| "0-0".to_string()); + Ok((con, last_id)) +} From 72ce30fcf768b8cc43debac8ed42ffaf1b1a7e64 Mon Sep 17 00:00:00 2001 From: Daniel Cadenas Date: Fri, 18 Oct 2024 10:14:49 -0300 Subject: [PATCH 2/2] Save last id --- src/vanish_subscriber.rs | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/src/vanish_subscriber.rs b/src/vanish_subscriber.rs index e6a8e2c..208aecd 100644 --- a/src/vanish_subscriber.rs +++ b/src/vanish_subscriber.rs @@ -146,6 +146,12 @@ pub fn start_vanish_subscriber( info!("Removed public key {} from vanish request", public_key.to_hex()); last_id = stream_id.id.clone(); + + if let Err(e) = con.set(VANISH_LAST_ID_KEY, last_id.clone()).await { + error!("Failed to save last id: {}", e); + } else { + info!("Updating last vanish stream id processed to {}", last_id); + } } } Ok::<(), Box>(()) @@ -165,7 +171,7 @@ async fn get_connection_and_last_id( ) -> Result<(T::Connection, String), RedisError> { let mut con = redis_client.get_connection().await?; let last_id = con - .get(&VANISH_LAST_ID_KEY) + .get(VANISH_LAST_ID_KEY) .await .unwrap_or_else(|_| "0-0".to_string()); Ok((con, last_id))