From eb2e75da94fab995d42226f5f9704f49ae5b5d3c Mon Sep 17 00:00:00 2001 From: Daniel Cadenas Date: Tue, 17 Sep 2024 21:22:18 -0300 Subject: [PATCH] Daily pagerank calculation --- Cargo.lock | 59 ++++++++++++++++++ Cargo.toml | 2 + config/settings.yml | 4 +- migrations/004_pagerank_index.cypher | 1 + src/bin/pagerank.rs | 49 +++++++++++++++ src/config.rs | 1 + src/domain/follows_differ.rs | 4 +- src/lib.rs | 1 + src/main.rs | 13 +++- src/metrics.rs | 5 ++ src/repo.rs | 91 ++++++++++++++++++++++++++-- src/scheduler.rs | 76 +++++++++++++++++++++++ src/worker_pool.rs | 4 +- 13 files changed, 299 insertions(+), 11 deletions(-) create mode 100644 migrations/004_pagerank_index.cypher create mode 100644 src/bin/pagerank.rs create mode 100644 src/scheduler.rs diff --git a/Cargo.lock b/Cargo.lock index cf77aa5..60bdbcf 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -710,6 +710,17 @@ dependencies = [ "cfg-if", ] +[[package]] +name = "cron" +version = "0.12.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6f8c3e73077b4b4a6ab1ea5047c37c57aee77657bc8ecd6f29b0af082d0b0c07" +dependencies = [ + "chrono", + "nom", + "once_cell", +] + [[package]] name = "crossbeam-epoch" version = "0.9.18" @@ -1711,6 +1722,8 @@ dependencies = [ "thiserror", "time", "tokio", + "tokio-cron-scheduler", + "tokio-retry", "tokio-util", "tower-http", "tracing", @@ -1848,6 +1861,17 @@ version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "51d515d32fb182ee37cda2ccdcb92950d6a3c2893aa280e540671c2cd0f3b1d9" +[[package]] +name = "num-derive" +version = "0.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ed3955f1a9c7c0c15e092f9c887db08b1fc683305fdf6eb6684f22555355e202" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.76", +] + [[package]] name = "num-integer" version = "0.1.46" @@ -3047,6 +3071,21 @@ dependencies = [ "windows-sys", ] +[[package]] +name = "tokio-cron-scheduler" +version = "0.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a7b9480125554f0ace1c3c3797a24b5cc56c6a7cd82c739db35fb54c4dc046f3" +dependencies = [ + "chrono", + "cron", + "num-derive", + "num-traits", + "tokio", + "tracing", + "uuid", +] + [[package]] name = "tokio-macros" version = "2.4.0" @@ -3058,6 +3097,17 @@ dependencies = [ "syn 2.0.76", ] +[[package]] +name = "tokio-retry" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7f57eb36ecbe0fc510036adff84824dd3c24bb781e21bfa67b69d556aa85214f" +dependencies = [ + "pin-project 1.1.5", + "rand", + "tokio", +] + [[package]] name = "tokio-rustls" version = "0.26.0" @@ -3432,6 +3482,15 @@ version = "0.7.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "09cc8ee72d2a9becf2f2febe0205bbed8fc6615b7cb429ad062dc7b7ddd036a9" +[[package]] +name = "uuid" +version = "1.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "81dfa00651efa65069b0b6b651f4aaa31ba9e3c3ce0137aaad053604ee7e0314" +dependencies = [ + "getrandom", +] + [[package]] name = "valuable" version = "0.1.0" diff --git a/Cargo.toml b/Cargo.toml index 16d01e2..0965c65 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -29,6 +29,8 @@ serde_json = "1.0.128" thiserror = "1.0.63" time = "0.3.36" tokio = { version = "1.40.0", features = ["full", "test-util"] } +tokio-cron-scheduler = "0.11.0" +tokio-retry = "0.3.0" tokio-util = { version = "0.7.11", features = ["rt"] } tower-http = { version = "0.5.2", features = ["timeout", "trace"] } tracing = "0.1.40" diff --git a/config/settings.yml b/config/settings.yml index f76252f..cd15df7 100644 --- a/config/settings.yml +++ b/config/settings.yml @@ -15,4 +15,6 @@ followers: # how often to flush the buffer to generate messages flush_period_seconds: 60 # 30 minutes, so no more than 2 messages per hour - min_seconds_between_messages: 1800 \ No newline at end of file + min_seconds_between_messages: 1800 + # Daily at midnight + pagerank_cron_expression: "0 0 0 * * *" \ No newline at end of file diff --git a/migrations/004_pagerank_index.cypher b/migrations/004_pagerank_index.cypher new file mode 100644 index 0000000..a5a91de --- /dev/null +++ b/migrations/004_pagerank_index.cypher @@ -0,0 +1 @@ +CREATE INDEX user_pagerank_idx FOR (u:User) ON (u.pagerank); diff --git a/src/bin/pagerank.rs b/src/bin/pagerank.rs new file mode 100644 index 0000000..0ee934b --- /dev/null +++ b/src/bin/pagerank.rs @@ -0,0 +1,49 @@ +use anyhow::{Context, Result}; +use neo4rs::Graph; +use nos_followers::{ + config::{Config, Settings}, + repo::{Repo, RepoTrait}, +}; +use std::sync::Arc; +use tracing::{error, info}; +use tracing_subscriber::{fmt, prelude::*, EnvFilter}; + +#[tokio::main] +async fn main() -> Result<()> { + // Initialize logging + tracing_subscriber::registry() + .with(fmt::layer()) + .with(EnvFilter::from_default_env()) + .init(); + + info!("PageRank updater started"); + + // Load configuration + let config = Config::new("config").context("Loading configuration failed")?; + let settings = config + .get::() + .context("Retrieving settings from configuration failed")?; + + // Connect to Neo4j + info!("Connecting to Neo4j at {}", settings.neo4j_uri); + let graph = Graph::new( + &settings.neo4j_uri, + &settings.neo4j_user, + &settings.neo4j_password, + ) + .await + .context("Failed to connect to Neo4j")?; + + // Initialize Repo + let repo = Arc::new(Repo::new(graph)); + + // Execute PageRank + info!("Executing PageRank update"); + if let Err(e) = repo.update_pagerank().await { + error!("PageRank update failed: {:?}", e); + return Err(e).context("PageRank update encountered an error"); + } + + info!("PageRank update completed successfully"); + Ok(()) +} diff --git a/src/config.rs b/src/config.rs index 3734173..fd1d5ce 100644 --- a/src/config.rs +++ b/src/config.rs @@ -22,6 +22,7 @@ pub struct Settings { pub min_seconds_between_messages: NonZeroUsize, pub tcp_importer_port: u16, pub http_port: u16, + pub pagerank_cron_expression: String, } impl Configurable for Settings { diff --git a/src/domain/follows_differ.rs b/src/domain/follows_differ.rs index 0bdd662..f95d827 100644 --- a/src/domain/follows_differ.rs +++ b/src/domain/follows_differ.rs @@ -21,7 +21,7 @@ struct FollowsDiff { pub struct FollowsDiffer where - T: RepoTrait + Sync + Send, + T: RepoTrait, U: GetEventsOf + Sync + Send, { repo: Arc, @@ -32,7 +32,7 @@ where #[async_trait] impl WorkerTask> for FollowsDiffer where - T: RepoTrait + Sync + Send, + T: RepoTrait, U: GetEventsOf + Sync + Send, { async fn call(&self, event: Box) -> Result<()> { diff --git a/src/lib.rs b/src/lib.rs index 1138d24..c7084d2 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -9,5 +9,6 @@ pub mod migrations; pub mod publisher; pub mod relay_subscriber; pub mod repo; +pub mod scheduler; pub mod tcp_importer; pub mod worker_pool; diff --git a/src/main.rs b/src/main.rs index 54e7f96..37a15df 100644 --- a/src/main.rs +++ b/src/main.rs @@ -8,6 +8,7 @@ use nos_followers::{ migrations::apply_migrations, relay_subscriber::{create_client, start_nostr_subscription}, repo::Repo, + scheduler::start_scheduler, tcp_importer::start_tcp_importer, worker_pool::WorkerPool, }; @@ -147,7 +148,7 @@ async fn start(settings: Settings) -> Result<()> { start_nostr_subscription( task_tracker.clone(), shared_nostr_client, - [settings.relay].into(), + [settings.relay.clone()].into(), filters, event_sender.clone(), cancellation_token.clone(), @@ -162,6 +163,16 @@ async fn start(settings: Settings) -> Result<()> { ) .await?; + info!("Starting scheduler"); + start_scheduler( + task_tracker.clone(), + repo.clone(), + cancellation_token.clone(), + &settings, + ) + .await + .context("Failed starting the scheduler")?; + HttpServer::start( task_tracker.clone(), settings.http_port, diff --git a/src/metrics.rs b/src/metrics.rs index 16349ae..84e8857 100644 --- a/src/metrics.rs +++ b/src/metrics.rs @@ -65,6 +65,10 @@ pub fn retained_follow_changes() -> Gauge { metrics::gauge!("retained_follow_changes") } +pub fn pagerank_seconds() -> Gauge { + metrics::gauge!("pagerank_seconds") +} + pub fn setup_metrics() -> Result { describe_counter!( "pubsub_messages", @@ -113,6 +117,7 @@ pub fn setup_metrics() -> Result { "retained_follow_changes", "Number of retained follow changes" ); + describe_gauge!("pagerank_seconds", "Seconds it takes to calculate pagerank"); let prometheus_builder = PrometheusBuilder::new(); let prometheus_handle = prometheus_builder.install_recorder()?; diff --git a/src/repo.rs b/src/repo.rs index 13c317d..f5b2bb5 100644 --- a/src/repo.rs +++ b/src/repo.rs @@ -1,8 +1,7 @@ -use core::panic; - use crate::account_info::FriendlyId; use crate::domain::ContactListFollow; use chrono::{DateTime, NaiveDateTime, Utc}; +use core::panic; use neo4rs::{query, Graph}; use nostr_sdk::prelude::PublicKey; use thiserror::Error; @@ -18,7 +17,7 @@ impl Repo { } // Default trait raises not implemented just to ease testing -pub trait RepoTrait { +pub trait RepoTrait: Sync + Send { /// Set the last contact list date seen for a user if it's newer than the stored value. Returns the previous value fn maybe_update_last_contact_list_at( &self, @@ -72,6 +71,13 @@ pub trait RepoTrait { { async { panic!("Not implemented") } } + + /// Filters users, calculates pagerank, sets the value to each node, and drops the temporary graph. + fn update_pagerank( + &self, + ) -> impl std::future::Future> + std::marker::Send { + async { panic!("Not implemented") } + } } impl RepoTrait for Repo { @@ -281,6 +287,61 @@ impl RepoTrait for Repo { Ok(follows) } + + /// Filters users, calculates pagerank, sets the value to each node, and drops the temporary graph. + /// We only calculate PageRank for users with more than one followee and at least one follower. + async fn update_pagerank(&self) -> Result<(), RepoError> { + let graph_name = "filteredGraph"; + + let statements = r#" + CALL { + CALL gds.graph.exists($graph_name) + YIELD exists + WITH exists + WHERE exists + CALL gds.graph.drop($graph_name) + YIELD graphName + RETURN graphName + UNION ALL + RETURN $graph_name AS graphName + } + WITH graphName + MATCH (n:User) + WHERE n.followee_count > 1 AND n.follower_count > 0 + WITH gds.graph.project( + graphName, + n, + n, + { + relationshipType: 'FOLLOWS', + relationshipProperties: {} + } + ) AS graphProjection, graphName + CALL gds.pageRank.write( + graphName, + { + writeProperty: 'pagerank', + maxIterations: 20, + dampingFactor: 0.85 + } + ) + YIELD nodePropertiesWritten + WITH graphName + CALL gds.graph.drop(graphName) + YIELD graphName AS droppedGraphName + RETURN droppedGraphName AS graphName; + "#; + + // Combined Cypher query to drop, create, run PageRank, and drop again + let query = query(&statements).param("graph_name", graph_name); + + self.graph + .run(query) + .await + .map_err(RepoError::ExecutePageRank)?; + + Ok(()) + } } /// A function to read as DateTime a value stored either as LocalDatetime or DateTime @@ -297,26 +358,46 @@ fn parse_datetime(row: &neo4rs::Row, field: &str) -> Result #[derive(Error, Debug)] pub enum RepoError { - #[error("Failed to set first contact list date: {0}")] + #[error("Failed to set last contact list date: {0}")] MaybeSetLastContactListAt(neo4rs::Error), + #[error("Failed to add friendly_id: {0}")] SetFriendlyId(neo4rs::Error), + #[error("Failed to upsert follow: {0}")] UpsertFollow(neo4rs::Error), + #[error("Failed to delete follow: {0}")] DeleteFollow(neo4rs::Error), + #[error("Failed to get follows: {0}")] GetFollows(neo4rs::Error), + #[error("Failed to get follows: {0}")] GetFollowsPubkey(nostr_sdk::key::Error), + #[error("Failed to get friendly_id: {0}")] GetFriendlyId(neo4rs::Error), - #[error("Failed to deserialize: {source} ({context})")] + + #[error("Failed to create in-memory graph for PageRank: {0}")] + CreateGraph(neo4rs::Error), + + #[error("Failed to execute PageRank: {0}")] + ExecutePageRank(neo4rs::Error), + + #[error("Failed to drop in-memory graph: {0}")] + DropGraph(neo4rs::Error), + + #[error("Failed to drop existing in-memory graph: {0}")] + DropExistingGraph(neo4rs::Error), + + #[error("Deserialization failed: {source} ({context})")] Deserialization { source: neo4rs::DeError, context: String, }, } + impl RepoError { pub fn deserialization_with_context>( source: neo4rs::DeError, diff --git a/src/scheduler.rs b/src/scheduler.rs new file mode 100644 index 0000000..6349109 --- /dev/null +++ b/src/scheduler.rs @@ -0,0 +1,76 @@ +use crate::metrics; +use crate::{config::Settings, repo::RepoTrait}; +use anyhow::Result; +use std::sync::Arc; +use tokio::time::{Duration, Instant}; +use tokio_cron_scheduler::{Job, JobScheduler}; +use tokio_retry::strategy::{jitter, ExponentialBackoff}; +use tokio_retry::Retry; +use tokio_util::{sync::CancellationToken, task::TaskTracker}; +use tracing::{error, info}; + +pub async fn start_scheduler( + task_tracker: TaskTracker, + repo: Arc, + cancellation_token: CancellationToken, + settings: &Settings, +) -> Result<()> +where + T: RepoTrait + 'static, +{ + let mut sched = JobScheduler::new().await?; + + let cron_expression = settings.pagerank_cron_expression.as_str(); + + let repo_clone = Arc::clone(&repo); + let job = Job::new_async(cron_expression, move |_uuid, _l| { + let repo_inner = Arc::clone(&repo_clone); + Box::pin(async move { + info!("Starting scheduled PageRank update..."); + + let start_time = Instant::now(); + + let retry_strategy = ExponentialBackoff::from_millis(100) + .max_delay(Duration::from_secs(10)) + .map(jitter); + + let result = Retry::spawn(retry_strategy, || async { + if let Err(e) = repo_inner.update_pagerank().await { + error!("Failed to update PageRank: {:?}", e); + return Err(e); + } + + Ok(()) + }) + .await; + + let elapsed = start_time.elapsed(); + + match result { + Ok(_) => info!("PageRank updated successfully in {:?}", elapsed), + Err(e) => error!( + "Failed to update PageRank after retries in {:?}: {:?}", + elapsed, e + ), + } + + metrics::pagerank_seconds().set(elapsed.as_secs_f64()); + }) + })?; + + sched.add(job).await?; + sched.start().await?; + + task_tracker.spawn(async move { + tokio::select! { + _ = cancellation_token.cancelled() => { + info!("Scheduler task cancelled, shutting down scheduler."); + if let Err(e) = sched.shutdown().await { + error!("Failed to shut down scheduler: {:?}", e); + } + } + } + }); + + Ok(()) +} diff --git a/src/worker_pool.rs b/src/worker_pool.rs index 70d765a..2477bb3 100644 --- a/src/worker_pool.rs +++ b/src/worker_pool.rs @@ -10,7 +10,7 @@ use tokio::sync::{ use tokio::time::{timeout, Duration}; use tokio_util::sync::CancellationToken; use tokio_util::task::TaskTracker; -use tracing::{error, info, trace, warn}; +use tracing::{debug, error, info, trace, warn}; pub struct WorkerPool {} // A channel based worker pool that distributes work to a pool of workers. @@ -132,7 +132,7 @@ fn create_worker_task( loop { tokio::select! { _ = cancellation_token.cancelled() => { - info!("{}: Cancellation token is cancelled, stopping worker", worker_name); + debug!("{}: Cancellation token is cancelled, stopping worker", worker_name); break; }