Skip to content

Commit

Permalink
Better FollowChange encapsulation
Browse files Browse the repository at this point in the history
  • Loading branch information
dcadenas committed Sep 14, 2024
1 parent 221a8ea commit d669295
Show file tree
Hide file tree
Showing 6 changed files with 77 additions and 43 deletions.
51 changes: 45 additions & 6 deletions src/domain/follow_change.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,14 @@ pub enum ChangeType {
/// A change in the follow relationship between two users.
#[derive(Clone, PartialOrd, Ord)]
pub struct FollowChange {
pub change_type: ChangeType,
pub followed_at: DateTime<Utc>,
pub follower: PublicKey,
pub friendly_follower: FriendlyId,
pub followee: PublicKey,
pub friendly_followee: FriendlyId,
change_type: ChangeType,
followed_at: DateTime<Utc>,
follower: PublicKey,
friendly_follower: FriendlyId,
followee: PublicKey,
friendly_followee: FriendlyId,
}

impl PartialEq for FollowChange {
fn eq(&self, other: &Self) -> bool {
self.change_type == other.change_type
Expand Down Expand Up @@ -58,10 +59,48 @@ impl FollowChange {
}
}

pub fn follower(&self) -> &PublicKey {
&self.follower
}

pub fn followee(&self) -> &PublicKey {
&self.followee
}

pub fn friendly_follower(&self) -> &FriendlyId {
&self.friendly_follower
}

pub fn set_friendly_follower(&mut self, name: FriendlyId) {
self.friendly_follower = name;
}

pub fn friendly_followee(&self) -> &FriendlyId {
&self.friendly_followee
}

pub fn set_friendly_followee(&mut self, name: FriendlyId) {
self.friendly_followee = name;
}

pub fn is_notifiable(&self) -> bool {
matches!(self.change_type, ChangeType::Followed)
}

pub fn is_older_than(&self, other: &Self) -> bool {
assert!(self.follower == other.follower);
assert!(self.followee == other.followee);

self.followed_at < other.followed_at
}

pub fn is_reverse_of(&self, other: &Self) -> bool {
assert!(self.follower == other.follower);
assert!(self.followee == other.followee);

self.change_type != other.change_type
}

#[cfg(test)]
pub fn with_friendly_follower(mut self, name: FriendlyId) -> Self {
self.friendly_follower = name;
Expand Down
17 changes: 9 additions & 8 deletions src/domain/followee_notification_factory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,29 +29,30 @@ impl FolloweeNotificationFactory {
match &self.followee {
Some(followee) => {
assert_eq!(
followee, &follow_change.followee,
followee,
follow_change.followee(),
"Followee mismatch in add_follower_change"
);
}
None => {
self.followee = Some(follow_change.followee);
self.followee = Some(*follow_change.followee());
}
}

let follower = follow_change.follower;
let follower = follow_change.follower();

if let Some(existing_change) = self.follow_changes.get(&follower) {
if follow_change.followed_at < existing_change.followed_at {
if let Some(existing_change) = self.follow_changes.get(follower) {
if !existing_change.is_older_than(&follow_change) {
return;
}

if follow_change.change_type != existing_change.change_type {
self.follow_changes.remove(&follower);
if existing_change.is_reverse_of(&follow_change) {
self.follow_changes.remove(follower);
return;
}
}

self.follow_changes.insert(follower, follow_change);
self.follow_changes.insert(*follower, follow_change);
}

// This is basically a sliding window log rate limiter
Expand Down
2 changes: 1 addition & 1 deletion src/domain/notification_factory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ impl NotificationFactory {
pub fn insert(&mut self, follow_change: Box<FollowChange>) {
let followee_info = self
.followee_maps
.entry(follow_change.followee)
.entry(*follow_change.followee())
.or_insert_with_key(|_| {
FolloweeNotificationFactory::new(self.min_time_between_messages)
});
Expand Down
18 changes: 10 additions & 8 deletions src/domain/notification_message.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use super::{ChangeType, FollowChange};
use super::FollowChange;
use crate::account_info::FriendlyId;
use nostr_sdk::prelude::*;
use ordermap::OrderSet;
Expand Down Expand Up @@ -40,24 +40,26 @@ impl NotificationMessage {
}

pub fn add(&mut self, follow_change: Box<FollowChange>) {
assert!(self.followee == follow_change.followee, "Followee mismatch");
assert!(
self.followee == *follow_change.followee(),
"Followee mismatch"
);

assert!(
self.len() < MAX_FOLLOWERS_PER_BATCH,
"Too many followers in a single message, can't exceed {}",
MAX_FOLLOWERS_PER_BATCH
);

assert_eq!(
follow_change.change_type,
ChangeType::Followed,
assert!(
follow_change.is_notifiable(),
"Only followed changes can be messaged"
);

self.follows.insert(follow_change.follower);
self.follows.insert(*follow_change.follower());

if self.len() == 1 {
self.friendly_follower = Some(follow_change.friendly_follower);
self.friendly_follower = Some(follow_change.friendly_follower().clone());
} else {
self.friendly_follower = None;
}
Expand Down Expand Up @@ -135,7 +137,7 @@ impl Debug for NotificationMessage {

impl From<Box<FollowChange>> for NotificationMessage {
fn from(change: Box<FollowChange>) -> Self {
let mut message = NotificationMessage::new(change.followee);
let mut message = NotificationMessage::new(*change.followee());
message.add(change);
message
}
Expand Down
24 changes: 12 additions & 12 deletions src/follow_change_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,14 +66,14 @@ impl<T: GetEventsOf> WorkerTask<Box<FollowChange>> for FollowChangeHandler<T> {
result = get_friendly_ids_from_db(&self.repo, &follow_change, self.timeout_secs) => result
);

follow_change.friendly_follower = friendly_follower;
follow_change.friendly_followee = friendly_followee;

debug!(
"Fetched friendly IDs for follow change from {} to {}, queueing for publication",
follow_change.friendly_follower, follow_change.friendly_followee
friendly_follower, friendly_followee
);

follow_change.set_friendly_follower(friendly_follower);
follow_change.set_friendly_followee(friendly_followee);

self.publisher.queue_publication(follow_change).await?;
Ok(())
}
Expand All @@ -86,8 +86,8 @@ async fn fetch_friendly_ids<T: GetEventsOf>(
follow_change: &FollowChange,
) -> (FriendlyId, FriendlyId) {
let (friendly_follower, friendly_followee) = tokio::join!(
refresh_friendly_id(repo, &nostr_client, &follow_change.follower),
refresh_friendly_id(repo, &nostr_client, &follow_change.followee),
refresh_friendly_id(repo, &nostr_client, follow_change.follower()),
refresh_friendly_id(repo, &nostr_client, follow_change.followee()),
);

(friendly_follower, friendly_followee)
Expand All @@ -104,24 +104,24 @@ async fn get_friendly_ids_from_db(
sleep(std::time::Duration::from_secs(timeout_secs)).await;

let (friendly_follower, friendly_followee) = tokio::join!(
repo.get_friendly_id(&follow_change.follower),
repo.get_friendly_id(&follow_change.followee)
repo.get_friendly_id(follow_change.follower()),
repo.get_friendly_id(follow_change.followee())
);

(
friendly_follower.ok().flatten().unwrap_or(
follow_change
.follower
.follower()
.to_bech32()
.map(FriendlyId::Npub)
.unwrap_or(FriendlyId::PublicKey(follow_change.follower.to_hex())),
.unwrap_or(FriendlyId::PublicKey(follow_change.follower().to_hex())),
),
friendly_followee.ok().flatten().unwrap_or(
follow_change
.followee
.followee()
.to_bech32()
.map(FriendlyId::Npub)
.unwrap_or(FriendlyId::PublicKey(follow_change.followee.to_hex())),
.unwrap_or(FriendlyId::PublicKey(follow_change.followee().to_hex())),
),
)
}
8 changes: 0 additions & 8 deletions src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,10 +61,6 @@ pub fn followers_per_message() -> Histogram {
metrics::histogram!("followers_per_message")
}

pub fn unfollowers_per_message() -> Histogram {
metrics::histogram!("unfollowers_per_message")
}

pub fn retained_follow_changes() -> Gauge {
metrics::gauge!("retained_follow_changes")
}
Expand Down Expand Up @@ -113,10 +109,6 @@ pub fn setup_metrics() -> Result<PrometheusHandle, anyhow::Error> {
"followers_per_message",
"Number of followers per aggregated message"
);
describe_histogram!(
"unfollowers_per_message",
"Number of unfollowers per aggregated message"
);
describe_gauge!(
"retained_follow_changes",
"Number of retained follow changes"
Expand Down

0 comments on commit d669295

Please sign in to comment.