Skip to content

Commit

Permalink
Block notifications from blocked users
Browse files Browse the repository at this point in the history
  • Loading branch information
hpeebles committed Jan 31, 2025
1 parent 9368e63 commit 6974741
Show file tree
Hide file tree
Showing 36 changed files with 455 additions and 40 deletions.
15 changes: 15 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,7 @@ members = [
"backend/libraries/ts_export",
"backend/libraries/ts_export_macros",
"backend/libraries/types",
"backend/libraries/user_ids_set",
"backend/libraries/utils",
"backend/notification_pusher/aws",
"backend/notification_pusher/cli",
Expand Down
2 changes: 2 additions & 0 deletions backend/canisters/local_user_index/api/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,8 @@ pub enum UserEvent {
NotifyChit(NotifyChit),
NotifyStreakInsurancePayment(UserCanisterStreakInsurancePayment),
NotifyStreakInsuranceClaim(UserCanisterStreakInsuranceClaim),
UserBlocked(UserId),
UserUnblocked(UserId),
}

#[derive(CandidType, Serialize, Deserialize, Clone, Copy, Debug, PartialEq, Eq, Hash)]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,5 +45,11 @@ fn handle_event(user_id: UserId, event: UserEvent, state: &mut RuntimeState) {
new_days_claimed: claim.new_days_claimed,
})));
}
UserEvent::UserBlocked(blocked) => {
state.push_event_to_user_index(UserIndexEvent::UserBlocked(user_id, blocked));
}
UserEvent::UserUnblocked(unblocked) => {
state.push_event_to_user_index(UserIndexEvent::UserUnblocked(user_id, unblocked));
}
}
}
Original file line number Diff line number Diff line change
@@ -1,13 +1,12 @@
use candid::CandidType;
use notifications_index_canister::NotificationsIndexEvent;
use serde::{Deserialize, Serialize};

#[derive(CandidType, Serialize, Deserialize, Debug)]
#[derive(Serialize, Deserialize, Debug)]
pub struct Args {
pub events: Vec<NotificationsIndexEvent>,
}

#[derive(CandidType, Serialize, Deserialize, Debug)]
#[derive(Serialize, Deserialize, Debug)]
pub enum Response {
Success,
}
2 changes: 2 additions & 0 deletions backend/canisters/notifications/impl/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ rand = { workspace = true }
serde = { workspace = true }
serde_bytes = { workspace = true }
stable_memory = { path = "../../../libraries/stable_memory" }
stable_memory_map = { path = "../../../libraries/stable_memory_map" }
tracing = { workspace = true }
types = { path = "../../../libraries/types" }
user_ids_set = { path = "../../../libraries/user_ids_set" }
utils = { path = "../../../libraries/utils" }
9 changes: 9 additions & 0 deletions backend/canisters/notifications/impl/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,11 @@ use crate::model::subscriptions::Subscriptions;
use candid::Principal;
use canister_state_macros::canister_state;
use serde::{Deserialize, Serialize};
use stable_memory_map::UserIdsKeyPrefix;
use std::cell::RefCell;
use std::collections::{BTreeMap, HashSet};
use types::{BuildVersion, CanisterId, Cycles, NotificationEnvelope, TimestampMillis, Timestamped};
use user_ids_set::UserIdsSet;
use utils::env::Environment;
use utils::event_stream::EventStream;

Expand Down Expand Up @@ -71,10 +73,16 @@ struct Data {
pub cycles_dispenser_canister_id: CanisterId,
pub notifications: EventStream<NotificationEnvelope>,
pub subscriptions: Subscriptions,
#[serde(default = "blocked_users")]
pub blocked_users: UserIdsSet,
pub rng_seed: [u8; 32],
pub test_mode: bool,
}

fn blocked_users() -> UserIdsSet {
UserIdsSet::new(UserIdsKeyPrefix::new_for_blocked_users())
}

impl Data {
pub fn new(
notifications_index_canister_id: CanisterId,
Expand All @@ -90,6 +98,7 @@ impl Data {
cycles_dispenser_canister_id,
notifications: EventStream::default(),
subscriptions: Subscriptions::default(),
blocked_users: UserIdsSet::new(UserIdsKeyPrefix::new_for_blocked_users()),
rng_seed: [0; 32],
test_mode,
}
Expand Down
2 changes: 2 additions & 0 deletions backend/canisters/notifications/impl/src/lifecycle/init.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use crate::lifecycle::{init_env, init_state};
use crate::memory::get_stable_memory_map_memory;
use crate::Data;
use canister_tracing_macros::trace;
use ic_cdk::init;
Expand All @@ -10,6 +11,7 @@ use utils::cycles::init_cycles_dispenser_client;
#[trace]
fn init(args: Args) {
canister_logger::init(args.test_mode);
stable_memory_map::init(get_stable_memory_map_memory());
init_cycles_dispenser_client(args.cycles_dispenser_canister_id, args.test_mode);

let env = init_env([0; 32]);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use crate::lifecycle::{init_env, init_state};
use crate::memory::get_upgrades_memory;
use crate::memory::{get_stable_memory_map_memory, get_upgrades_memory};
use crate::Data;
use canister_logger::LogEntry;
use canister_tracing_macros::trace;
Expand All @@ -12,6 +12,8 @@ use utils::cycles::init_cycles_dispenser_client;
#[post_upgrade]
#[trace]
fn post_upgrade(args: Args) {
stable_memory_map::init(get_stable_memory_map_memory());

let memory = get_upgrades_memory();
let reader = get_reader(&memory);

Expand Down
7 changes: 6 additions & 1 deletion backend/canisters/notifications/impl/src/memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use ic_stable_structures::{
use std::collections::BTreeMap;

const UPGRADES: MemoryId = MemoryId::new(0);
const STABLE_MEMORY_MAP: MemoryId = MemoryId::new(3);

pub type Memory = VirtualMemory<DefaultMemoryImpl>;

Expand All @@ -17,8 +18,12 @@ pub fn get_upgrades_memory() -> Memory {
get_memory(UPGRADES)
}

pub fn get_stable_memory_map_memory() -> Memory {
get_memory(STABLE_MEMORY_MAP)
}

pub fn memory_sizes() -> BTreeMap<u8, u64> {
(0u8..=0).map(|id| (id, get_memory(MemoryId::new(id)).size())).collect()
(0u8..=3).map(|id| (id, get_memory(MemoryId::new(id)).size())).collect()
}

fn get_memory(id: MemoryId) -> Memory {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use canister_api_macros::update;
use canister_tracing_macros::trace;
use notifications_canister::c2c_push_notification::{Response::*, *};
use serde_bytes::ByteBuf;
use std::collections::HashSet;
use types::{CanPushNotificationsArgs, CanPushNotificationsResponse, CanisterId, NotificationEnvelope, UserId};

#[update(msgpack = true)]
Expand All @@ -25,7 +26,7 @@ async fn c2c_push_notification(args: Args) -> Response {
_ => {}
}

mutate_state(|state| c2c_push_notification_impl(args.recipients, args.notification_bytes, state))
mutate_state(|state| c2c_push_notification_impl(args.sender, args.recipients, args.notification_bytes, state))
}

enum CanPushNotificationsResult {
Expand All @@ -48,10 +49,19 @@ fn can_push_notifications(args: &Args, state: &RuntimeState) -> CanPushNotificat
CanPushNotificationsResult::Blocked
}

fn c2c_push_notification_impl(recipients: Vec<UserId>, notification_bytes: ByteBuf, state: &mut RuntimeState) -> Response {
fn c2c_push_notification_impl(
sender: Option<UserId>,
recipients: Vec<UserId>,
notification_bytes: ByteBuf,
state: &mut RuntimeState,
) -> Response {
let users_who_have_blocked_sender: HashSet<_> = sender
.map(|s| state.data.blocked_users.all_linked_users(s))
.unwrap_or_default();

let filtered_recipients: Vec<_> = recipients
.into_iter()
.filter(|u| state.data.subscriptions.any_for_user(u))
.filter(|u| state.data.subscriptions.any_for_user(u) && !users_who_have_blocked_sender.contains(u))
.collect();

if !filtered_recipients.is_empty() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use canister_api_macros::update;
use canister_tracing_macros::trace;
use notifications_canister::c2c_sync_index::{Response::*, *};
use notifications_index_canister::NotificationsIndexEvent;
use stable_memory_map::StableMemoryMap;

#[update(guard = "caller_is_notifications_index", msgpack = true)]
#[trace]
Expand All @@ -23,6 +24,12 @@ fn c2c_sync_index_impl(args: Args, state: &mut RuntimeState) -> Response {
NotificationsIndexEvent::AllSubscriptionsRemoved(u) => {
state.data.subscriptions.remove_all(u);
}
NotificationsIndexEvent::UserBlocked(user_id, blocked) => {
state.data.blocked_users.insert((blocked, user_id), ());
}
NotificationsIndexEvent::UserUnblocked(user_id, unblocked) => {
state.data.blocked_users.remove(&(unblocked, user_id));
}
}
}
Success
Expand Down
15 changes: 11 additions & 4 deletions backend/canisters/notifications_index/api/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,24 +6,31 @@ pub use lifecycle::*;
pub use queries::*;
pub use updates::*;

use candid::CandidType;
use serde::{Deserialize, Serialize};
use types::{SubscriptionInfo, UserId};

#[derive(CandidType, Serialize, Deserialize, Clone, Debug)]
#[derive(Serialize, Deserialize, Clone, Debug)]
pub enum NotificationsIndexEvent {
SubscriptionAdded(SubscriptionAdded),
SubscriptionRemoved(SubscriptionRemoved),
AllSubscriptionsRemoved(UserId),
UserBlocked(UserId, UserId),
UserUnblocked(UserId, UserId),
}

#[derive(CandidType, Serialize, Deserialize, Clone, Debug)]
#[derive(Serialize, Deserialize, Clone, Debug)]
pub enum UserIndexEvent {
UserBlocked(UserId, UserId),
UserUnblocked(UserId, UserId),
}

#[derive(Serialize, Deserialize, Clone, Debug)]
pub struct SubscriptionAdded {
pub user_id: UserId,
pub subscription: SubscriptionInfo,
}

#[derive(CandidType, Serialize, Deserialize, Clone, Debug)]
#[derive(Serialize, Deserialize, Clone, Debug)]
pub struct SubscriptionRemoved {
pub user_id: UserId,
pub p256dh_key: String,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
use crate::UserIndexEvent;
use serde::{Deserialize, Serialize};

#[derive(Serialize, Deserialize, Clone, Debug)]
pub struct Args {
pub events: Vec<UserIndexEvent>,
}

#[derive(Serialize, Deserialize, Clone, Debug)]
pub enum Response {
Success,
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
pub mod add_notifications_canister;
pub mod c2c_sync_user_index_events;
pub mod push_subscription;
pub mod remove_subscription;
pub mod remove_subscriptions;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,4 @@ use notifications_index_canister::*;

// Updates
generate_c2c_call!(add_notifications_canister);
generate_c2c_call!(c2c_sync_user_index_events);
1 change: 1 addition & 0 deletions backend/canisters/notifications_index/impl/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ stable_memory_map = { path = "../../../libraries/stable_memory_map" }
timer_job_queues = { path = "../../../libraries/timer_job_queues" }
tracing = { workspace = true }
types = { path = "../../../libraries/types" }
user_ids_set = { path = "../../../libraries/user_ids_set" }
user_index_canister = { path = "../../user_index/api" }
user_index_canister_c2c_client = { path = "../../user_index/c2c_client" }
utils = { path = "../../../libraries/utils" }
8 changes: 8 additions & 0 deletions backend/canisters/notifications_index/impl/src/guards.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,14 @@ pub fn caller_is_governance_principal() -> Result<(), String> {
}
}

pub fn caller_is_user_index_canister() -> Result<(), String> {
if read_state(|state| state.is_caller_user_index_canister()) {
Ok(())
} else {
Err("Caller is not the UserIndex canister".to_string())
}
}

pub fn caller_is_registry_canister() -> Result<(), String> {
if read_state(|state| state.is_caller_registry_canister()) {
Ok(())
Expand Down
13 changes: 13 additions & 0 deletions backend/canisters/notifications_index/impl/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,12 @@ use canister_state_macros::canister_state;
use notifications_index_canister::{NotificationsIndexEvent, SubscriptionAdded, SubscriptionRemoved};
use principal_to_user_id_map::PrincipalToUserIdMap;
use serde::{Deserialize, Serialize};
use stable_memory_map::UserIdsKeyPrefix;
use std::cell::RefCell;
use std::collections::{BTreeMap, HashMap, HashSet};
use timer_job_queues::GroupedTimerJobQueue;
use types::{BuildVersion, CanisterId, CanisterWasm, Cycles, SubscriptionInfo, TimestampMillis, Timestamped, UserId};
use user_ids_set::UserIdsSet;
use utils::canister::CanistersRequiringUpgrade;
use utils::canister_event_sync_queue::CanisterEventSyncQueue;
use utils::env::Environment;
Expand Down Expand Up @@ -43,6 +45,10 @@ impl RuntimeState {
self.data.governance_principals.contains(&caller)
}

pub fn is_caller_user_index_canister(&self) -> bool {
self.env.caller() == self.data.user_index_canister_id
}

pub fn is_caller_registry_canister(&self) -> bool {
self.env.caller() == self.data.registry_canister_id
}
Expand Down Expand Up @@ -134,6 +140,8 @@ struct Data {
pub notifications_index_event_sync_queue: CanisterEventSyncQueue<NotificationsIndexEvent>,
#[serde(default = "notification_canisters_event_sync_queue")]
pub notification_canisters_event_sync_queue: GroupedTimerJobQueue<NotificationCanistersEventBatch>,
#[serde(default = "blocked_users")]
pub blocked_users: UserIdsSet,
pub rng_seed: [u8; 32],
pub test_mode: bool,
}
Expand All @@ -142,6 +150,10 @@ fn notification_canisters_event_sync_queue() -> GroupedTimerJobQueue<Notificatio
GroupedTimerJobQueue::new(5, false)
}

fn blocked_users() -> UserIdsSet {
UserIdsSet::new(UserIdsKeyPrefix::new_for_blocked_users())
}

impl Data {
pub fn new(
governance_principals: Vec<Principal>,
Expand All @@ -166,6 +178,7 @@ impl Data {
canisters_requiring_upgrade: CanistersRequiringUpgrade::default(),
notifications_index_event_sync_queue: CanisterEventSyncQueue::default(),
notification_canisters_event_sync_queue: GroupedTimerJobQueue::new(5, false),
blocked_users: UserIdsSet::new(UserIdsKeyPrefix::new_for_blocked_users()),
rng_seed: [0; 32],
test_mode,
}
Expand Down
Loading

0 comments on commit 6974741

Please sign in to comment.