Skip to content

Commit

Permalink
Daily pagerank calculation
Browse files Browse the repository at this point in the history
  • Loading branch information
dcadenas committed Sep 18, 2024
1 parent d669295 commit eb2e75d
Show file tree
Hide file tree
Showing 13 changed files with 299 additions and 11 deletions.
59 changes: 59 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ serde_json = "1.0.128"
thiserror = "1.0.63"
time = "0.3.36"
tokio = { version = "1.40.0", features = ["full", "test-util"] }
tokio-cron-scheduler = "0.11.0"
tokio-retry = "0.3.0"
tokio-util = { version = "0.7.11", features = ["rt"] }
tower-http = { version = "0.5.2", features = ["timeout", "trace"] }
tracing = "0.1.40"
Expand Down
4 changes: 3 additions & 1 deletion config/settings.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,6 @@ followers:
# how often to flush the buffer to generate messages
flush_period_seconds: 60
# 30 minutes, so no more than 2 messages per hour
min_seconds_between_messages: 1800
min_seconds_between_messages: 1800
# Daily at midnight
pagerank_cron_expression: "0 0 0 * * *"
1 change: 1 addition & 0 deletions migrations/004_pagerank_index.cypher
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
CREATE INDEX user_pagerank_idx FOR (u:User) ON (u.pagerank);
49 changes: 49 additions & 0 deletions src/bin/pagerank.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
use anyhow::{Context, Result};
use neo4rs::Graph;
use nos_followers::{
config::{Config, Settings},
repo::{Repo, RepoTrait},
};
use std::sync::Arc;
use tracing::{error, info};
use tracing_subscriber::{fmt, prelude::*, EnvFilter};

#[tokio::main]
async fn main() -> Result<()> {
// Initialize logging
tracing_subscriber::registry()
.with(fmt::layer())
.with(EnvFilter::from_default_env())
.init();

info!("PageRank updater started");

// Load configuration
let config = Config::new("config").context("Loading configuration failed")?;
let settings = config
.get::<Settings>()
.context("Retrieving settings from configuration failed")?;

// Connect to Neo4j
info!("Connecting to Neo4j at {}", settings.neo4j_uri);
let graph = Graph::new(
&settings.neo4j_uri,
&settings.neo4j_user,
&settings.neo4j_password,
)
.await
.context("Failed to connect to Neo4j")?;

// Initialize Repo
let repo = Arc::new(Repo::new(graph));

// Execute PageRank
info!("Executing PageRank update");
if let Err(e) = repo.update_pagerank().await {
error!("PageRank update failed: {:?}", e);
return Err(e).context("PageRank update encountered an error");
}

info!("PageRank update completed successfully");
Ok(())
}
1 change: 1 addition & 0 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ pub struct Settings {
pub min_seconds_between_messages: NonZeroUsize,
pub tcp_importer_port: u16,
pub http_port: u16,
pub pagerank_cron_expression: String,
}

impl Configurable for Settings {
Expand Down
4 changes: 2 additions & 2 deletions src/domain/follows_differ.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ struct FollowsDiff {

pub struct FollowsDiffer<T, U>
where
T: RepoTrait + Sync + Send,
T: RepoTrait,
U: GetEventsOf + Sync + Send,
{
repo: Arc<T>,
Expand All @@ -32,7 +32,7 @@ where
#[async_trait]
impl<T, U> WorkerTask<Box<Event>> for FollowsDiffer<T, U>
where
T: RepoTrait + Sync + Send,
T: RepoTrait,
U: GetEventsOf + Sync + Send,
{
async fn call(&self, event: Box<Event>) -> Result<()> {
Expand Down
1 change: 1 addition & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,5 +9,6 @@ pub mod migrations;
pub mod publisher;
pub mod relay_subscriber;
pub mod repo;
pub mod scheduler;
pub mod tcp_importer;
pub mod worker_pool;
13 changes: 12 additions & 1 deletion src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use nos_followers::{
migrations::apply_migrations,
relay_subscriber::{create_client, start_nostr_subscription},
repo::Repo,
scheduler::start_scheduler,
tcp_importer::start_tcp_importer,
worker_pool::WorkerPool,
};
Expand Down Expand Up @@ -147,7 +148,7 @@ async fn start(settings: Settings) -> Result<()> {
start_nostr_subscription(
task_tracker.clone(),
shared_nostr_client,
[settings.relay].into(),
[settings.relay.clone()].into(),
filters,
event_sender.clone(),
cancellation_token.clone(),
Expand All @@ -162,6 +163,16 @@ async fn start(settings: Settings) -> Result<()> {
)
.await?;

info!("Starting scheduler");
start_scheduler(
task_tracker.clone(),
repo.clone(),
cancellation_token.clone(),
&settings,
)
.await
.context("Failed starting the scheduler")?;

HttpServer::start(
task_tracker.clone(),
settings.http_port,
Expand Down
5 changes: 5 additions & 0 deletions src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,10 @@ pub fn retained_follow_changes() -> Gauge {
metrics::gauge!("retained_follow_changes")
}

pub fn pagerank_seconds() -> Gauge {
metrics::gauge!("pagerank_seconds")
}

pub fn setup_metrics() -> Result<PrometheusHandle, anyhow::Error> {
describe_counter!(
"pubsub_messages",
Expand Down Expand Up @@ -113,6 +117,7 @@ pub fn setup_metrics() -> Result<PrometheusHandle, anyhow::Error> {
"retained_follow_changes",
"Number of retained follow changes"
);
describe_gauge!("pagerank_seconds", "Seconds it takes to calculate pagerank");

let prometheus_builder = PrometheusBuilder::new();
let prometheus_handle = prometheus_builder.install_recorder()?;
Expand Down
Loading

0 comments on commit eb2e75d

Please sign in to comment.