Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add vanish_requests stream subscriber #31

Merged
merged 2 commits into from
Oct 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 69 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ neo4rs = "0.8.0"
nonzero_ext = "0.3.0"
nostr-sdk = "0.33.0"
ordermap = "0.5.3"
redis = { version = "0.27.4", features = ["connection-manager", "tls-rustls", "tls-rustls-webpki-roots", "tokio-rustls-comp"] }
rustls = { version = "0.23.12", features = ["ring"] }
serde = { version = "1.0.209", features = ["derive"] }
serde_json = "1.0.128"
Expand Down
4 changes: 0 additions & 4 deletions compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,6 @@ services:
context: .
target: final
environment:
- APP__followers__relay=ws://relay:7777
- APP__followers__neo4j_uri=db:7687
- APP__followers__neo4j_user=neo4j
- APP__followers__neo4j_password=mydevpassword
- APP__ENVIRONMENT=development
- GOOGLE_APPLICATION_CREDENTIALS=/app/gcloud/application_default_credentials.json
- RUST_LOG=nos_followers=debug
Expand Down
1 change: 1 addition & 0 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ pub struct Settings {
pub pagerank_cron_expression: String,
pub http_cache_seconds: u32,
pub burst: NonZeroU16,
pub redis_url: String,
}

impl Configurable for Settings {
Expand Down
1 change: 1 addition & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,5 @@ pub mod relay_subscriber;
pub mod repo;
pub mod scheduler;
pub mod tcp_importer;
pub mod vanish_subscriber;
pub mod worker_pool;
13 changes: 13 additions & 0 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use nos_followers::{
repo::{Repo, RepoTrait},
scheduler::start_scheduler,
tcp_importer::start_tcp_importer,
vanish_subscriber::{start_vanish_subscriber, RedisClient},
worker_pool::WorkerPool,
};
use nostr_sdk::prelude::*;
Expand Down Expand Up @@ -153,6 +154,18 @@ async fn start_server(settings: Settings) -> Result<()> {
.await
.context("Failed starting the scheduler")?;

// TODO: Now that we have redis we would use it to restore pending
// notifications between restarts and integrate it with cached crate
let redis_client = RedisClient::new(&settings.redis_url);

info!("Starting vanish subscriber");
start_vanish_subscriber(
task_tracker.clone(),
redis_client,
repo.clone(),
cancellation_token.clone(),
);

tokio::spawn(async move {
if let Err(e) = cancel_on_stop_signals(cancellation_token).await {
error!("Failed to listen stop signals: {}", e);
Expand Down
49 changes: 48 additions & 1 deletion src/repo.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ impl Repo {
}

// Default trait raises not implemented just to ease testing
pub trait RepoTrait: Sync + Send {
pub trait RepoTrait: Sync + Send + 'static {
/// Set the last contact list date seen for a user if it's newer than the stored value
fn update_last_contact_list_at(
&self,
Expand Down Expand Up @@ -107,6 +107,13 @@ pub trait RepoTrait: Sync + Send {
{
async { panic!("Not implemented") }
}

fn remove_pubkey(
&self,
_public_key: &PublicKey,
) -> impl std::future::Future<Output = Result<(), RepoError>> + std::marker::Send {
async { panic!("Not implemented") }
}
}

impl RepoTrait for Repo {
Expand Down Expand Up @@ -571,6 +578,43 @@ impl RepoTrait for Repo {
Err(e) => Err(RepoError::General(e)),
}
}

async fn remove_pubkey(&self, public_key: &PublicKey) -> Result<(), RepoError> {
let statement = r#"
MATCH (user:User {pubkey: $pubkey_val})

// Decrement follower_count of followees
OPTIONAL MATCH (user)-[:FOLLOWS]->(followee:User)
FOREACH (f IN CASE WHEN followee IS NOT NULL THEN [followee] ELSE [] END |
SET f.follower_count = CASE
WHEN f.follower_count > 0 THEN f.follower_count - 1
ELSE 0
END
)

// Decrement followee_count of followers
WITH user
OPTIONAL MATCH (follower:User)-[:FOLLOWS]->(user)
FOREACH (f IN CASE WHEN follower IS NOT NULL THEN [follower] ELSE [] END |
SET f.followee_count = CASE
WHEN f.followee_count > 0 THEN f.followee_count - 1
ELSE 0
END
)

WITH user
DETACH DELETE user
"#;

let query = query(statement).param("pubkey_val", public_key.to_hex());

self.graph
.run(query)
.await
.map_err(RepoError::RemovePubkey)?;

Ok(())
}
}

/// A function to read as DateTime<Utc> a value stored either as LocalDatetime or DateTime<Utc>
Expand Down Expand Up @@ -640,6 +684,9 @@ pub enum RepoError {

#[error("Failed to get pagerank: {0}")]
GetPageRank(neo4rs::Error),

#[error("Failed to remove pubkey: {0}")]
RemovePubkey(neo4rs::Error),
}

impl RepoError {
Expand Down
Loading
Loading