Skip to content

Commit

Permalink
Fix burst logic
Browse files Browse the repository at this point in the history
  • Loading branch information
dcadenas committed Sep 26, 2024
1 parent ad1920a commit c722c8a
Show file tree
Hide file tree
Showing 9 changed files with 106 additions and 106 deletions.
3 changes: 2 additions & 1 deletion config/settings.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ followers:
google_project_id: "pub-verse-app"
google_topic: "follow-changes"
flush_period_seconds: 60 # how often to flush the buffer to generate messages
min_seconds_between_messages: 900 # 15 minutes
min_seconds_between_messages: 21600 # 6 hours, so no more than 4 messages per day, unless there are burst tokens available
burst: 10 # Number of messages that get replenished after 24 hours of no messages. When available, they override min_seconds_between_messages
pagerank_cron_expression: "0 0 0 * * *" # Daily at midnight
http_cache_seconds: 86400 # 24 hours
2 changes: 0 additions & 2 deletions scripts/make_10_followers_for_2_followees.sh
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,4 @@ while :; do

nak event -k 3 -t p="$followee" --sec $(nostrkeytool --gen) ws://localhost:7777
done

sleep 3
done
3 changes: 2 additions & 1 deletion src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use config_rs::{Config as ConfigTree, ConfigError, Environment, File};
use serde::de::DeserializeOwned;
use serde::Deserialize;
use std::env;
use std::num::NonZeroUsize;
use std::num::{NonZeroU16, NonZeroUsize};
use tracing::info;

#[derive(Debug, Deserialize)]
Expand All @@ -24,6 +24,7 @@ pub struct Settings {
pub http_port: u16,
pub pagerank_cron_expression: String,
pub http_cache_seconds: u32,
pub burst: NonZeroU16,
}

impl Configurable for Settings {
Expand Down
10 changes: 7 additions & 3 deletions src/domain/account_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use serde::Serialize;
use serde::Serializer;
use std::fmt::Display;
use std::sync::Arc;
use tokio::time::timeout;
use tracing::{debug, error};

#[derive(Debug, PartialEq, Clone, PartialOrd, Ord, Eq)]
Expand Down Expand Up @@ -292,9 +293,12 @@ struct Nip05Verifier;

impl VerifyNip05 for Nip05Verifier {
async fn verify_nip05(&self, public_key: &PublicKey, nip05_value: &str) -> bool {
nip05::verify(public_key, nip05_value, None)
.await
.unwrap_or(false)
timeout(
std::time::Duration::from_secs(2),
nip05::verify(public_key, nip05_value, None),
)
.await
.map_or(false, |inner_result| inner_result.unwrap_or(false))
}
}

Expand Down
40 changes: 23 additions & 17 deletions src/domain/followee_notification_factory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,33 +3,34 @@ use crate::rate_limiter::RateLimiter;
use nostr_sdk::PublicKey;
use ordermap::OrderMap;
use std::fmt::Debug;
use std::num::NonZeroUsize;
use std::time::Duration;
use tokio::time::Instant;
use tracing::info;

type Follower = PublicKey;
type Followee = PublicKey;

static ONE_DAY: Duration = Duration::from_secs(24 * 60 * 60);
static TWELVE_HOURS: Duration = Duration::from_secs(12 * 60 * 60);
/// Accumulates messages for a followee and flushes them in batches
pub struct FolloweeNotificationFactory {
pub follow_changes: OrderMap<Follower, Box<FollowChange>>,
pub followee: Option<Followee>,
min_time_between_messages: Duration,
rate_limiter: RateLimiter,
emptied_at: Option<Instant>,
}

impl FolloweeNotificationFactory {
pub fn new(min_time_between_messages: Duration) -> Self {
// Rate limiter for 1 message every 12 hours, bursts of 10
let capacity = 10.0;
let rate_limiter = RateLimiter::new(capacity, TWELVE_HOURS);
pub fn new(capacity: u16, min_seconds_between_messages: NonZeroUsize) -> Self {
// Rate limiter for 1 message every `min_seconds_between_messages`, with a
// burst of `capacity`.
let min_time_between_messages =
Duration::from_secs(min_seconds_between_messages.get() as u64);
let rate_limiter = RateLimiter::new(capacity as f64, min_time_between_messages);

Self {
follow_changes: OrderMap::with_capacity(100),
followee: None,
min_time_between_messages,
rate_limiter,
emptied_at: None,
}
Expand Down Expand Up @@ -70,15 +71,6 @@ impl FolloweeNotificationFactory {
pub fn should_flush(&mut self) -> bool {
let now = Instant::now();

let min_time_elapsed = match self.emptied_at {
Some(emptied_at) => now.duration_since(emptied_at) >= self.min_time_between_messages,
None => true,
};

if !min_time_elapsed {
return false;
}

let one_day_elapsed = match self.emptied_at {
Some(emptied_at) => now.duration_since(emptied_at) >= ONE_DAY,
None => true,
Expand All @@ -100,7 +92,11 @@ impl FolloweeNotificationFactory {
}

pub fn should_delete(&mut self) -> bool {
self.follow_changes.is_empty() && self.should_flush()
// If it has been empty for a day, it's ok to delete
self.follow_changes.is_empty()
&& self.emptied_at.map_or(true, |emptied_at| {
Instant::now().duration_since(emptied_at) >= ONE_DAY
})
}

pub fn no_followers(&self) -> bool {
Expand Down Expand Up @@ -130,6 +126,16 @@ impl FolloweeNotificationFactory {
.collect();

let tokens_needed = messages.len() as f64;

// Just to sample the rate limiter
if tokens_needed > 1.0 {
info!(
"Rate limiter for followee {} after flush: {}",
self.followee.unwrap(),
self.rate_limiter
);
}

self.rate_limiter.overcharge(tokens_needed);

return messages;
Expand Down
Loading

0 comments on commit c722c8a

Please sign in to comment.