From c722c8a8db413144c099cb1a54d65b027a798f17 Mon Sep 17 00:00:00 2001 From: Daniel Cadenas Date: Thu, 26 Sep 2024 11:14:22 -0300 Subject: [PATCH] Fix burst logic --- config/settings.yml | 3 +- scripts/make_10_followers_for_2_followees.sh | 2 - src/config.rs | 3 +- src/domain/account_info.rs | 10 +- src/domain/followee_notification_factory.rs | 40 +++--- src/domain/notification_factory.rs | 129 +++++++------------ src/follow_change_handler.rs | 1 + src/publisher.rs | 10 +- src/rate_limiter.rs | 14 +- 9 files changed, 106 insertions(+), 106 deletions(-) diff --git a/config/settings.yml b/config/settings.yml index eff53d4..43f24cd 100644 --- a/config/settings.yml +++ b/config/settings.yml @@ -13,6 +13,7 @@ followers: google_project_id: "pub-verse-app" google_topic: "follow-changes" flush_period_seconds: 60 # how often to flush the buffer to generate messages - min_seconds_between_messages: 900 # 15 minutes + min_seconds_between_messages: 21600 # 6 hours, so no more than 4 messages per day, unless there are burst tokens available + burst: 10 # Number of messages that get replenished after 24 hours of no messages. When available, they override min_seconds_between_messages pagerank_cron_expression: "0 0 0 * * *" # Daily at midnight http_cache_seconds: 86400 # 24 hours \ No newline at end of file diff --git a/scripts/make_10_followers_for_2_followees.sh b/scripts/make_10_followers_for_2_followees.sh index 312d252..ca61e73 100755 --- a/scripts/make_10_followers_for_2_followees.sh +++ b/scripts/make_10_followers_for_2_followees.sh @@ -17,6 +17,4 @@ while :; do nak event -k 3 -t p="$followee" --sec $(nostrkeytool --gen) ws://localhost:7777 done - - sleep 3 done diff --git a/src/config.rs b/src/config.rs index 15212d3..2140b25 100644 --- a/src/config.rs +++ b/src/config.rs @@ -2,7 +2,7 @@ use config_rs::{Config as ConfigTree, ConfigError, Environment, File}; use serde::de::DeserializeOwned; use serde::Deserialize; use std::env; -use std::num::NonZeroUsize; +use std::num::{NonZeroU16, NonZeroUsize}; use tracing::info; #[derive(Debug, Deserialize)] @@ -24,6 +24,7 @@ pub struct Settings { pub http_port: u16, pub pagerank_cron_expression: String, pub http_cache_seconds: u32, + pub burst: NonZeroU16, } impl Configurable for Settings { diff --git a/src/domain/account_info.rs b/src/domain/account_info.rs index 94017c7..2577061 100644 --- a/src/domain/account_info.rs +++ b/src/domain/account_info.rs @@ -11,6 +11,7 @@ use serde::Serialize; use serde::Serializer; use std::fmt::Display; use std::sync::Arc; +use tokio::time::timeout; use tracing::{debug, error}; #[derive(Debug, PartialEq, Clone, PartialOrd, Ord, Eq)] @@ -292,9 +293,12 @@ struct Nip05Verifier; impl VerifyNip05 for Nip05Verifier { async fn verify_nip05(&self, public_key: &PublicKey, nip05_value: &str) -> bool { - nip05::verify(public_key, nip05_value, None) - .await - .unwrap_or(false) + timeout( + std::time::Duration::from_secs(2), + nip05::verify(public_key, nip05_value, None), + ) + .await + .map_or(false, |inner_result| inner_result.unwrap_or(false)) } } diff --git a/src/domain/followee_notification_factory.rs b/src/domain/followee_notification_factory.rs index 8025fa6..bbcf819 100644 --- a/src/domain/followee_notification_factory.rs +++ b/src/domain/followee_notification_factory.rs @@ -3,33 +3,34 @@ use crate::rate_limiter::RateLimiter; use nostr_sdk::PublicKey; use ordermap::OrderMap; use std::fmt::Debug; +use std::num::NonZeroUsize; use std::time::Duration; use tokio::time::Instant; +use tracing::info; type Follower = PublicKey; type Followee = PublicKey; static ONE_DAY: Duration = Duration::from_secs(24 * 60 * 60); -static TWELVE_HOURS: Duration = Duration::from_secs(12 * 60 * 60); /// Accumulates messages for a followee and flushes them in batches pub struct FolloweeNotificationFactory { pub follow_changes: OrderMap>, pub followee: Option, - min_time_between_messages: Duration, rate_limiter: RateLimiter, emptied_at: Option, } impl FolloweeNotificationFactory { - pub fn new(min_time_between_messages: Duration) -> Self { - // Rate limiter for 1 message every 12 hours, bursts of 10 - let capacity = 10.0; - let rate_limiter = RateLimiter::new(capacity, TWELVE_HOURS); + pub fn new(capacity: u16, min_seconds_between_messages: NonZeroUsize) -> Self { + // Rate limiter for 1 message every `min_seconds_between_messages`, with a + // burst of `capacity`. + let min_time_between_messages = + Duration::from_secs(min_seconds_between_messages.get() as u64); + let rate_limiter = RateLimiter::new(capacity as f64, min_time_between_messages); Self { follow_changes: OrderMap::with_capacity(100), followee: None, - min_time_between_messages, rate_limiter, emptied_at: None, } @@ -70,15 +71,6 @@ impl FolloweeNotificationFactory { pub fn should_flush(&mut self) -> bool { let now = Instant::now(); - let min_time_elapsed = match self.emptied_at { - Some(emptied_at) => now.duration_since(emptied_at) >= self.min_time_between_messages, - None => true, - }; - - if !min_time_elapsed { - return false; - } - let one_day_elapsed = match self.emptied_at { Some(emptied_at) => now.duration_since(emptied_at) >= ONE_DAY, None => true, @@ -100,7 +92,11 @@ impl FolloweeNotificationFactory { } pub fn should_delete(&mut self) -> bool { - self.follow_changes.is_empty() && self.should_flush() + // If it has been empty for a day, it's ok to delete + self.follow_changes.is_empty() + && self.emptied_at.map_or(true, |emptied_at| { + Instant::now().duration_since(emptied_at) >= ONE_DAY + }) } pub fn no_followers(&self) -> bool { @@ -130,6 +126,16 @@ impl FolloweeNotificationFactory { .collect(); let tokens_needed = messages.len() as f64; + + // Just to sample the rate limiter + if tokens_needed > 1.0 { + info!( + "Rate limiter for followee {} after flush: {}", + self.followee.unwrap(), + self.rate_limiter + ); + } + self.rate_limiter.overcharge(tokens_needed); return messages; diff --git a/src/domain/notification_factory.rs b/src/domain/notification_factory.rs index ee88035..bb5888e 100644 --- a/src/domain/notification_factory.rs +++ b/src/domain/notification_factory.rs @@ -5,7 +5,6 @@ use nostr_sdk::PublicKey; use ordermap::OrderMap; use std::collections::HashSet; use std::num::NonZeroUsize; -use tokio::time::Duration; use tracing::info; type Followee = PublicKey; @@ -15,21 +14,23 @@ type Followee = PublicKey; /// the results into `NotificationMessage` instances per followee. pub struct NotificationFactory { followee_maps: OrderMap, - min_time_between_messages: Duration, + burst: u16, + min_seconds_between_messages: NonZeroUsize, } impl NotificationFactory { - pub fn new(min_seconds_between_messages: NonZeroUsize) -> Self { + pub fn new(burst: u16, min_seconds_between_messages: usize) -> Self { info!( "One message in {} seconds allowed", min_seconds_between_messages ); + let min_seconds_between_messages = NonZeroUsize::new(min_seconds_between_messages).unwrap(); + Self { followee_maps: OrderMap::with_capacity(1_000), - min_time_between_messages: Duration::from_secs( - min_seconds_between_messages.get() as u64 - ), + burst, + min_seconds_between_messages, } } @@ -38,7 +39,7 @@ impl NotificationFactory { .followee_maps .entry(*follow_change.followee()) .or_insert_with_key(|_| { - FolloweeNotificationFactory::new(self.min_time_between_messages) + FolloweeNotificationFactory::new(self.burst, self.min_seconds_between_messages) }); followee_info.insert(follow_change); @@ -140,7 +141,6 @@ mod tests { use crate::domain::notification_message::MAX_FOLLOWERS_PER_BATCH; use assertables::*; use chrono::{DateTime, Utc}; - use nonzero_ext::nonzero; use nostr_sdk::prelude::Keys; use std::iter::repeat; use std::sync::LazyLock; @@ -152,7 +152,7 @@ mod tests { #[test] fn test_insert_follow_change() { - let mut notification_factory = NotificationFactory::new(nonzero!(60usize)); + let mut notification_factory = NotificationFactory::new(10, 60); let follower = Keys::generate().public_key(); let followee = Keys::generate().public_key(); @@ -172,7 +172,7 @@ mod tests { #[test] fn test_does_not_replace_with_older_change() { - let mut notification_factory = NotificationFactory::new(nonzero!(60usize)); + let mut notification_factory = NotificationFactory::new(10, 60); let follower = Keys::generate().public_key(); let followee = Keys::generate().public_key(); @@ -190,7 +190,7 @@ mod tests { #[test] fn test_insert_same_follower_different_followee() { - let mut notification_factory = NotificationFactory::new(nonzero!(60usize)); + let mut notification_factory = NotificationFactory::new(10, 60); let follower = Keys::generate().public_key(); let followee1 = Keys::generate().public_key(); @@ -215,7 +215,7 @@ mod tests { #[test] fn test_an_unfollow_cancels_a_follow() { - let mut notification_factory = NotificationFactory::new(nonzero!(60usize)); + let mut notification_factory = NotificationFactory::new(10, 60); let follower = Keys::generate().public_key(); let followee = Keys::generate().public_key(); @@ -232,7 +232,7 @@ mod tests { #[test] fn test_a_follow_cancels_an_unfollow() { - let mut notification_factory = NotificationFactory::new(nonzero!(60usize)); + let mut notification_factory = NotificationFactory::new(10, 60); let follower = Keys::generate().public_key(); let followee = Keys::generate().public_key(); @@ -248,9 +248,9 @@ mod tests { #[test] fn test_first_single_item_batch() { - let min_seconds_between_messages = nonzero!(3600usize); + let min_seconds_between_messages = 3600; - let mut notification_factory = NotificationFactory::new(min_seconds_between_messages); + let mut notification_factory = NotificationFactory::new(10, min_seconds_between_messages); let followee = Keys::generate().public_key(); @@ -265,11 +265,11 @@ mod tests { } #[tokio::test(start_paused = true)] - async fn test_no_second_message_before_min_seconds_elapse() { + async fn test_no_more_messages_after_burst_and_before_next_token() { // After one single follow change, we need to wait min period - let min_seconds_between_messages = nonzero!(3600usize); + let min_seconds_between_messages = 12 * 3600; // 12 hours - let mut notification_factory = NotificationFactory::new(min_seconds_between_messages); + let mut notification_factory = NotificationFactory::new(1, min_seconds_between_messages); let followee = Keys::generate().public_key(); @@ -283,16 +283,14 @@ mod tests { advance(Duration::from_secs(1)).await; - // We didn't wait min seconds + // We didn't wait min seconds, and the burst is set to 1, so we need to + // wait a full min_seconds_between_messages period to get the message let messages = notification_factory.flush(); assert_batches_eq(&messages, &[]); assert_eq!(notification_factory.follow_changes_len(), 2); // We pass the number of minimum seconds between messages so all are sent - advance(Duration::from_secs( - min_seconds_between_messages.get() as u64 - )) - .await; + advance(Duration::from_secs(min_seconds_between_messages as u64 + 1)).await; let messages = notification_factory.flush(); assert_batches_eq(&messages, &[(followee, &[change2, change3])]); } @@ -300,10 +298,10 @@ mod tests { #[tokio::test(start_paused = true)] async fn test_batch_sizes_after_min_seconds() { // After one single follow change, we need to wait min period - let min_seconds_between_messages = nonzero!(3600usize); - const MAX_FOLLOWERS_TRIPLED: usize = 3 * MAX_FOLLOWERS_PER_BATCH; + let min_seconds_between_messages = 12 * 3600; // 12 hours + const MAX_FOLLOWERS_TIMES_TEN: usize = 10 * MAX_FOLLOWERS_PER_BATCH; - let mut notification_factory = NotificationFactory::new(min_seconds_between_messages); + let mut notification_factory = NotificationFactory::new(10, min_seconds_between_messages); let followee = Keys::generate().public_key(); insert_new_follower(&mut notification_factory, followee); @@ -313,31 +311,25 @@ mod tests { assert_eq!(messages.len(), 1,); assert!(messages[0].is_single()); - repeat(()).take(MAX_FOLLOWERS_TRIPLED).for_each(|_| { + repeat(()).take(MAX_FOLLOWERS_TIMES_TEN).for_each(|_| { insert_new_follower(&mut notification_factory, followee); }); - // All inserted MAX_FOLLOWERS_TRIPLED changes wait for min period + // All inserted followers use the capacity of the rate limiter, so they are all sent let messages = notification_factory.flush(); - assert_eq!(messages.len(), 0); - assert_eq!( - notification_factory.follow_changes_len(), - MAX_FOLLOWERS_TRIPLED, - ); + assert_eq!(messages.len(), 10); + assert_eq!(notification_factory.follow_changes_len(), 0); // Before the max_retention time elapses.. advance(Duration::from_secs( - (min_seconds_between_messages.get() - 5) as u64, + (min_seconds_between_messages - 1) as u64, )) .await; // .. we insert another change insert_new_follower(&mut notification_factory, followee); - assert_eq!( - notification_factory.follow_changes_len(), - MAX_FOLLOWERS_TRIPLED + 1, - ); + assert_eq!(notification_factory.follow_changes_len(), 1); let messages = notification_factory.flush(); assert_eq!(messages.len(), 0); @@ -346,12 +338,9 @@ mod tests { advance(Duration::from_secs(6u64)).await; let messages = notification_factory.flush(); - // But it will be included in the next batch anyways, regardless of the min period - assert_eq!(messages.len(), 4); - assert_eq!(messages[0].len(), MAX_FOLLOWERS_PER_BATCH); - assert_eq!(messages[1].len(), MAX_FOLLOWERS_PER_BATCH); - assert_eq!(messages[2].len(), MAX_FOLLOWERS_PER_BATCH); - assert_eq!(messages[3].len(), 1); + // But it's retained, there are not tokens + assert_eq!(messages.len(), 0); + assert_eq!(notification_factory.follow_changes_len(), 1); // And another change arrives insert_new_follower(&mut notification_factory, followee); @@ -362,30 +351,25 @@ mod tests { let messages = notification_factory.flush(); // The new one is flushed, the old one is retained assert_eq!(messages.len(), 1); - assert_eq!(notification_factory.follow_changes_len(), 1); + assert_eq!(notification_factory.follow_changes_len(), 2); - advance(Duration::from_secs( - min_seconds_between_messages.get() as u64 + 1, - )) - .await; + advance(Duration::from_secs(min_seconds_between_messages as u64 + 1)).await; let messages = notification_factory.flush(); // Now all are flushed assert_eq!(messages.len(), 1); assert_eq!(notification_factory.follow_changes_len(), 0); - // But we keep the followee info for the time calculations for one more - // period, event if no changes are pending - assert_eq!(notification_factory.followees_len(), 1); + // But we keep the followee info for the time calculations for a day, + // event if no changes are pending + assert_eq!(notification_factory.followees_len(), 2); assert_eq!(notification_factory.follow_changes_len(), 0); - advance(Duration::from_secs( - min_seconds_between_messages.get() as u64 + 1, - )) - .await; + advance(Duration::from_secs(24 * 60 * 60_u64 + 1)).await; let messages = notification_factory.flush(); - // Now all is cleared + // Now all is cleared, no followee info is retained and next messages + // replenished the burst capacity assert_eq!(notification_factory.follow_changes_len(), 0); assert_eq!(notification_factory.followees_len(), 0); assert_eq!(messages.len(), 0); @@ -393,18 +377,14 @@ mod tests { #[tokio::test(start_paused = true)] async fn test_after_burst_of_10_messages_wait_12_hours() { - let min_seconds_between_messages = nonzero!(15 * 60usize); - let mut notification_factory = NotificationFactory::new(min_seconds_between_messages); + let min_seconds_between_messages = 12 * 3600; // 12 hours + let mut notification_factory = NotificationFactory::new(10, min_seconds_between_messages); let followee = Keys::generate().public_key(); // After 10 messages for _ in 0..10 { insert_new_follower(&mut notification_factory, followee); - advance(Duration::from_secs( - min_seconds_between_messages.get() as u64 - )) - .await; let messages = notification_factory.flush(); assert_eq!(messages.len(), 1); assert_eq!(notification_factory.follow_changes_len(), 0); @@ -437,10 +417,7 @@ mod tests { advance(Duration::from_secs(24 * 3600)).await; for _ in 0..10 { insert_new_follower(&mut notification_factory, followee); - advance(Duration::from_secs( - min_seconds_between_messages.get() as u64 - )) - .await; + advance(Duration::from_secs(min_seconds_between_messages as u64)).await; let messages = notification_factory.flush(); assert_eq!(messages.len(), 1); assert_eq!(notification_factory.follow_changes_len(), 0); @@ -450,7 +427,7 @@ mod tests { #[test] fn test_is_empty_and_len() { - let mut notification_factory = NotificationFactory::new(nonzero!(60usize)); + let mut notification_factory = NotificationFactory::new(10, 60); let followee1 = Keys::generate().public_key(); let followee2 = Keys::generate().public_key(); @@ -470,27 +447,21 @@ mod tests { #[tokio::test(start_paused = true)] async fn test_unfollows_are_not_sent() { - let min_seconds_between_messages = nonzero!(60usize); - let mut notification_factory = NotificationFactory::new(min_seconds_between_messages); + let min_seconds_between_messages = 60; + let mut notification_factory = NotificationFactory::new(10, min_seconds_between_messages); let followee = Keys::generate().public_key(); insert_new_follower(&mut notification_factory, followee); - advance(Duration::from_secs( - min_seconds_between_messages.get() as u64 + 1, - )) - .await; + advance(Duration::from_secs(min_seconds_between_messages as u64 + 1)).await; let messages = notification_factory.flush(); assert_eq!(messages.len(), 1); insert_new_unfollower(&mut notification_factory, followee); - advance(Duration::from_secs( - min_seconds_between_messages.get() as u64 + 1, - )) - .await; + advance(Duration::from_secs(min_seconds_between_messages as u64 + 1)).await; let messages = notification_factory.flush(); assert_eq!(messages.len(), 0); @@ -498,7 +469,7 @@ mod tests { #[test] fn test_flush_clears_map() { - let mut notification_factory = NotificationFactory::new(nonzero!(60usize)); + let mut notification_factory = NotificationFactory::new(10, 60); let follower = Keys::generate().public_key(); let followee = Keys::generate().public_key(); diff --git a/src/follow_change_handler.rs b/src/follow_change_handler.rs index e2b075a..0949b6e 100644 --- a/src/follow_change_handler.rs +++ b/src/follow_change_handler.rs @@ -40,6 +40,7 @@ where google_publisher_client, settings.flush_period_seconds, settings.min_seconds_between_messages, + settings.burst, ) .await?; diff --git a/src/publisher.rs b/src/publisher.rs index 26c63f3..ba455d0 100644 --- a/src/publisher.rs +++ b/src/publisher.rs @@ -2,6 +2,7 @@ use crate::domain::FollowChange; use crate::domain::NotificationFactory; use crate::domain::NotificationMessage; use futures::Future; +use std::num::NonZeroU16; use std::num::NonZeroUsize; use thiserror::Error; use tokio::select; @@ -42,11 +43,12 @@ impl Publisher { mut client: impl PublishEvents + Send + Sync + 'static, flush_period_seconds: NonZeroUsize, min_seconds_between_messages: NonZeroUsize, + burst: NonZeroU16, ) -> Result { let (publication_sender, mut publication_receiver) = mpsc::channel::>(20000); - let mut buffer = NotificationFactory::new(min_seconds_between_messages); + let mut buffer = NotificationFactory::new(burst.get(), min_seconds_between_messages.get()); tokio::spawn(async move { info!("Publishing messages every {} seconds", flush_period_seconds); @@ -162,12 +164,14 @@ mod tests { let cancellation_token = CancellationToken::new(); let flush_period_seconds = nonzero!(1usize); let min_seconds_between_messages = nonzero!(60usize); + let burst = nonzero!(10u16); let publisher = Publisher::create( cancellation_token.clone(), mock_client, flush_period_seconds, min_seconds_between_messages, + burst, ) .await .unwrap(); @@ -250,12 +254,14 @@ mod tests { let cancellation_token = CancellationToken::new(); let flush_period_seconds = nonzero!(1usize); let min_seconds_between_messages = nonzero!(60usize); + let burst = nonzero!(10u16); let publisher = Publisher::create( cancellation_token.clone(), mock_client, flush_period_seconds, min_seconds_between_messages, + burst, ) .await .unwrap(); @@ -314,12 +320,14 @@ mod tests { let cancellation_token = CancellationToken::new(); let flush_period_seconds = nonzero!(1usize); let min_seconds_between_messages = nonzero!(60usize); + let burst = nonzero!(10u16); let publisher = Publisher::create( cancellation_token.clone(), mock_client, flush_period_seconds, min_seconds_between_messages, + burst, ) .await .unwrap(); diff --git a/src/rate_limiter.rs b/src/rate_limiter.rs index b03d2ce..6800ab1 100644 --- a/src/rate_limiter.rs +++ b/src/rate_limiter.rs @@ -1,5 +1,5 @@ +use std::fmt::Display; use tokio::time::{Duration, Instant}; -use tracing::debug; /// Token bucket rate limiter. pub struct RateLimiter { @@ -33,7 +33,6 @@ impl RateLimiter { self.last_refill = now; let tokens_to_add = elapsed / self.refill_rate_per_sec; - debug!("Tokens to add: {}", tokens_to_add); self.tokens = (self.tokens + tokens_to_add).min(self.capacity); } @@ -70,6 +69,17 @@ impl RateLimiter { } } +impl Display for RateLimiter { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + let last_refill_seconds_ago = Instant::now().duration_since(self.last_refill).as_secs(); + write!( + f, + "RateLimiter {{ capacity: {}, tokens: {}, last_refill: {:?} secs ago}}", + self.capacity, self.tokens, last_refill_seconds_ago + ) + } +} + #[cfg(test)] mod tests { use super::*;