diff --git a/Cargo.lock b/Cargo.lock index c5387f910f..d8457c8902 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5080,8 +5080,10 @@ dependencies = [ "serde", "serde_bytes", "stable_memory", + "stable_memory_map", "tracing", "types", + "user_ids_set", "utils 0.1.0", ] @@ -5149,6 +5151,7 @@ dependencies = [ "timer_job_queues", "tracing", "types", + "user_ids_set", "user_index_canister", "user_index_canister_c2c_client", "utils 0.1.0", @@ -8449,6 +8452,16 @@ dependencies = [ "utils 0.1.0", ] +[[package]] +name = "user_ids_set" +version = "0.1.0" +dependencies = [ + "ic_principal", + "serde", + "stable_memory_map", + "types", +] + [[package]] name = "user_index_canister" version = "0.1.0" @@ -8532,6 +8545,8 @@ dependencies = [ "modclub_canister", "modclub_canister_c2c_client", "msgpack", + "notifications_index_canister", + "notifications_index_canister_c2c_client", "online_users_canister", "online_users_canister_c2c_client", "p256_key_pair", diff --git a/Cargo.toml b/Cargo.toml index b3a958193d..34de76e670 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -153,6 +153,7 @@ members = [ "backend/libraries/ts_export", "backend/libraries/ts_export_macros", "backend/libraries/types", + "backend/libraries/user_ids_set", "backend/libraries/utils", "backend/notification_pusher/aws", "backend/notification_pusher/cli", diff --git a/backend/canisters/local_user_index/api/src/lib.rs b/backend/canisters/local_user_index/api/src/lib.rs index 367a412217..b7785cbe58 100644 --- a/backend/canisters/local_user_index/api/src/lib.rs +++ b/backend/canisters/local_user_index/api/src/lib.rs @@ -261,6 +261,8 @@ pub enum UserEvent { NotifyChit(NotifyChit), NotifyStreakInsurancePayment(UserCanisterStreakInsurancePayment), NotifyStreakInsuranceClaim(UserCanisterStreakInsuranceClaim), + UserBlocked(UserId), + UserUnblocked(UserId), } #[derive(CandidType, Serialize, Deserialize, Clone, Copy, Debug, PartialEq, Eq, Hash)] diff --git a/backend/canisters/local_user_index/impl/src/updates/c2c_notify_user_events.rs b/backend/canisters/local_user_index/impl/src/updates/c2c_notify_user_events.rs index 88d661d6b3..2a590c113d 100644 --- a/backend/canisters/local_user_index/impl/src/updates/c2c_notify_user_events.rs +++ b/backend/canisters/local_user_index/impl/src/updates/c2c_notify_user_events.rs @@ -45,5 +45,11 @@ fn handle_event(user_id: UserId, event: UserEvent, state: &mut RuntimeState) { new_days_claimed: claim.new_days_claimed, }))); } + UserEvent::UserBlocked(blocked) => { + state.push_event_to_user_index(UserIndexEvent::UserBlocked(user_id, blocked)); + } + UserEvent::UserUnblocked(unblocked) => { + state.push_event_to_user_index(UserIndexEvent::UserUnblocked(user_id, unblocked)); + } } } diff --git a/backend/canisters/notifications/api/src/updates/c2c_sync_index.rs b/backend/canisters/notifications/api/src/updates/c2c_sync_index.rs index abec042d06..3b1e03faa9 100644 --- a/backend/canisters/notifications/api/src/updates/c2c_sync_index.rs +++ b/backend/canisters/notifications/api/src/updates/c2c_sync_index.rs @@ -1,13 +1,12 @@ -use candid::CandidType; use notifications_index_canister::NotificationsIndexEvent; use serde::{Deserialize, Serialize}; -#[derive(CandidType, Serialize, Deserialize, Debug)] +#[derive(Serialize, Deserialize, Debug)] pub struct Args { pub events: Vec, } -#[derive(CandidType, Serialize, Deserialize, Debug)] +#[derive(Serialize, Deserialize, Debug)] pub enum Response { Success, } diff --git a/backend/canisters/notifications/impl/Cargo.toml b/backend/canisters/notifications/impl/Cargo.toml index 30a7d1b1ad..b07c084efd 100644 --- a/backend/canisters/notifications/impl/Cargo.toml +++ b/backend/canisters/notifications/impl/Cargo.toml @@ -27,6 +27,8 @@ rand = { workspace = true } serde = { workspace = true } serde_bytes = { workspace = true } stable_memory = { path = "../../../libraries/stable_memory" } +stable_memory_map = { path = "../../../libraries/stable_memory_map" } tracing = { workspace = true } types = { path = "../../../libraries/types" } +user_ids_set = { path = "../../../libraries/user_ids_set" } utils = { path = "../../../libraries/utils" } diff --git a/backend/canisters/notifications/impl/src/lib.rs b/backend/canisters/notifications/impl/src/lib.rs index 235303cd51..c4fdc1cecd 100644 --- a/backend/canisters/notifications/impl/src/lib.rs +++ b/backend/canisters/notifications/impl/src/lib.rs @@ -3,9 +3,11 @@ use crate::model::subscriptions::Subscriptions; use candid::Principal; use canister_state_macros::canister_state; use serde::{Deserialize, Serialize}; +use stable_memory_map::UserIdsKeyPrefix; use std::cell::RefCell; use std::collections::{BTreeMap, HashSet}; use types::{BuildVersion, CanisterId, Cycles, NotificationEnvelope, TimestampMillis, Timestamped}; +use user_ids_set::UserIdsSet; use utils::env::Environment; use utils::event_stream::EventStream; @@ -71,10 +73,16 @@ struct Data { pub cycles_dispenser_canister_id: CanisterId, pub notifications: EventStream, pub subscriptions: Subscriptions, + #[serde(default = "blocked_users")] + pub blocked_users: UserIdsSet, pub rng_seed: [u8; 32], pub test_mode: bool, } +fn blocked_users() -> UserIdsSet { + UserIdsSet::new(UserIdsKeyPrefix::new_for_blocked_users()) +} + impl Data { pub fn new( notifications_index_canister_id: CanisterId, @@ -90,6 +98,7 @@ impl Data { cycles_dispenser_canister_id, notifications: EventStream::default(), subscriptions: Subscriptions::default(), + blocked_users: UserIdsSet::new(UserIdsKeyPrefix::new_for_blocked_users()), rng_seed: [0; 32], test_mode, } diff --git a/backend/canisters/notifications/impl/src/lifecycle/init.rs b/backend/canisters/notifications/impl/src/lifecycle/init.rs index f0673db4fc..8d70c258d7 100644 --- a/backend/canisters/notifications/impl/src/lifecycle/init.rs +++ b/backend/canisters/notifications/impl/src/lifecycle/init.rs @@ -1,4 +1,5 @@ use crate::lifecycle::{init_env, init_state}; +use crate::memory::get_stable_memory_map_memory; use crate::Data; use canister_tracing_macros::trace; use ic_cdk::init; @@ -10,6 +11,7 @@ use utils::cycles::init_cycles_dispenser_client; #[trace] fn init(args: Args) { canister_logger::init(args.test_mode); + stable_memory_map::init(get_stable_memory_map_memory()); init_cycles_dispenser_client(args.cycles_dispenser_canister_id, args.test_mode); let env = init_env([0; 32]); diff --git a/backend/canisters/notifications/impl/src/lifecycle/post_upgrade.rs b/backend/canisters/notifications/impl/src/lifecycle/post_upgrade.rs index 97fd6e2b05..f0f97a9161 100644 --- a/backend/canisters/notifications/impl/src/lifecycle/post_upgrade.rs +++ b/backend/canisters/notifications/impl/src/lifecycle/post_upgrade.rs @@ -1,5 +1,5 @@ use crate::lifecycle::{init_env, init_state}; -use crate::memory::get_upgrades_memory; +use crate::memory::{get_stable_memory_map_memory, get_upgrades_memory}; use crate::Data; use canister_logger::LogEntry; use canister_tracing_macros::trace; @@ -12,6 +12,8 @@ use utils::cycles::init_cycles_dispenser_client; #[post_upgrade] #[trace] fn post_upgrade(args: Args) { + stable_memory_map::init(get_stable_memory_map_memory()); + let memory = get_upgrades_memory(); let reader = get_reader(&memory); diff --git a/backend/canisters/notifications/impl/src/memory.rs b/backend/canisters/notifications/impl/src/memory.rs index ae5fb3759a..056dc8683c 100644 --- a/backend/canisters/notifications/impl/src/memory.rs +++ b/backend/canisters/notifications/impl/src/memory.rs @@ -5,6 +5,7 @@ use ic_stable_structures::{ use std::collections::BTreeMap; const UPGRADES: MemoryId = MemoryId::new(0); +const STABLE_MEMORY_MAP: MemoryId = MemoryId::new(3); pub type Memory = VirtualMemory; @@ -17,8 +18,12 @@ pub fn get_upgrades_memory() -> Memory { get_memory(UPGRADES) } +pub fn get_stable_memory_map_memory() -> Memory { + get_memory(STABLE_MEMORY_MAP) +} + pub fn memory_sizes() -> BTreeMap { - (0u8..=0).map(|id| (id, get_memory(MemoryId::new(id)).size())).collect() + (0u8..=3).map(|id| (id, get_memory(MemoryId::new(id)).size())).collect() } fn get_memory(id: MemoryId) -> Memory { diff --git a/backend/canisters/notifications/impl/src/updates/c2c_push_notification.rs b/backend/canisters/notifications/impl/src/updates/c2c_push_notification.rs index b48c71fd3b..8aeb4ea67a 100644 --- a/backend/canisters/notifications/impl/src/updates/c2c_push_notification.rs +++ b/backend/canisters/notifications/impl/src/updates/c2c_push_notification.rs @@ -4,6 +4,7 @@ use canister_api_macros::update; use canister_tracing_macros::trace; use notifications_canister::c2c_push_notification::{Response::*, *}; use serde_bytes::ByteBuf; +use std::collections::HashSet; use types::{CanPushNotificationsArgs, CanPushNotificationsResponse, CanisterId, NotificationEnvelope, UserId}; #[update(msgpack = true)] @@ -25,7 +26,7 @@ async fn c2c_push_notification(args: Args) -> Response { _ => {} } - mutate_state(|state| c2c_push_notification_impl(args.recipients, args.notification_bytes, state)) + mutate_state(|state| c2c_push_notification_impl(args.sender, args.recipients, args.notification_bytes, state)) } enum CanPushNotificationsResult { @@ -48,10 +49,19 @@ fn can_push_notifications(args: &Args, state: &RuntimeState) -> CanPushNotificat CanPushNotificationsResult::Blocked } -fn c2c_push_notification_impl(recipients: Vec, notification_bytes: ByteBuf, state: &mut RuntimeState) -> Response { +fn c2c_push_notification_impl( + sender: Option, + recipients: Vec, + notification_bytes: ByteBuf, + state: &mut RuntimeState, +) -> Response { + let users_who_have_blocked_sender: HashSet<_> = sender + .map(|s| state.data.blocked_users.all_linked_users(s)) + .unwrap_or_default(); + let filtered_recipients: Vec<_> = recipients .into_iter() - .filter(|u| state.data.subscriptions.any_for_user(u)) + .filter(|u| state.data.subscriptions.any_for_user(u) && !users_who_have_blocked_sender.contains(u)) .collect(); if !filtered_recipients.is_empty() { diff --git a/backend/canisters/notifications/impl/src/updates/c2c_sync_index.rs b/backend/canisters/notifications/impl/src/updates/c2c_sync_index.rs index 8ffd081c43..59f36d0ea7 100644 --- a/backend/canisters/notifications/impl/src/updates/c2c_sync_index.rs +++ b/backend/canisters/notifications/impl/src/updates/c2c_sync_index.rs @@ -4,6 +4,7 @@ use canister_api_macros::update; use canister_tracing_macros::trace; use notifications_canister::c2c_sync_index::{Response::*, *}; use notifications_index_canister::NotificationsIndexEvent; +use stable_memory_map::StableMemoryMap; #[update(guard = "caller_is_notifications_index", msgpack = true)] #[trace] @@ -23,6 +24,12 @@ fn c2c_sync_index_impl(args: Args, state: &mut RuntimeState) -> Response { NotificationsIndexEvent::AllSubscriptionsRemoved(u) => { state.data.subscriptions.remove_all(u); } + NotificationsIndexEvent::UserBlocked(user_id, blocked) => { + state.data.blocked_users.insert((blocked, user_id), ()); + } + NotificationsIndexEvent::UserUnblocked(user_id, unblocked) => { + state.data.blocked_users.remove(&(unblocked, user_id)); + } } } Success diff --git a/backend/canisters/notifications_index/api/src/lib.rs b/backend/canisters/notifications_index/api/src/lib.rs index 2e59aef1ba..87f20f21ce 100644 --- a/backend/canisters/notifications_index/api/src/lib.rs +++ b/backend/canisters/notifications_index/api/src/lib.rs @@ -6,24 +6,31 @@ pub use lifecycle::*; pub use queries::*; pub use updates::*; -use candid::CandidType; use serde::{Deserialize, Serialize}; use types::{SubscriptionInfo, UserId}; -#[derive(CandidType, Serialize, Deserialize, Clone, Debug)] +#[derive(Serialize, Deserialize, Clone, Debug)] pub enum NotificationsIndexEvent { SubscriptionAdded(SubscriptionAdded), SubscriptionRemoved(SubscriptionRemoved), AllSubscriptionsRemoved(UserId), + UserBlocked(UserId, UserId), + UserUnblocked(UserId, UserId), } -#[derive(CandidType, Serialize, Deserialize, Clone, Debug)] +#[derive(Serialize, Deserialize, Clone, Debug)] +pub enum UserIndexEvent { + UserBlocked(UserId, UserId), + UserUnblocked(UserId, UserId), +} + +#[derive(Serialize, Deserialize, Clone, Debug)] pub struct SubscriptionAdded { pub user_id: UserId, pub subscription: SubscriptionInfo, } -#[derive(CandidType, Serialize, Deserialize, Clone, Debug)] +#[derive(Serialize, Deserialize, Clone, Debug)] pub struct SubscriptionRemoved { pub user_id: UserId, pub p256dh_key: String, diff --git a/backend/canisters/notifications_index/api/src/updates/c2c_sync_user_index_events.rs b/backend/canisters/notifications_index/api/src/updates/c2c_sync_user_index_events.rs new file mode 100644 index 0000000000..f5963c5f8a --- /dev/null +++ b/backend/canisters/notifications_index/api/src/updates/c2c_sync_user_index_events.rs @@ -0,0 +1,12 @@ +use crate::UserIndexEvent; +use serde::{Deserialize, Serialize}; + +#[derive(Serialize, Deserialize, Clone, Debug)] +pub struct Args { + pub events: Vec, +} + +#[derive(Serialize, Deserialize, Clone, Debug)] +pub enum Response { + Success, +} diff --git a/backend/canisters/notifications_index/api/src/updates/mod.rs b/backend/canisters/notifications_index/api/src/updates/mod.rs index 50814c340a..dfc38c4c0c 100644 --- a/backend/canisters/notifications_index/api/src/updates/mod.rs +++ b/backend/canisters/notifications_index/api/src/updates/mod.rs @@ -1,4 +1,5 @@ pub mod add_notifications_canister; +pub mod c2c_sync_user_index_events; pub mod push_subscription; pub mod remove_subscription; pub mod remove_subscriptions; diff --git a/backend/canisters/notifications_index/c2c_client/src/lib.rs b/backend/canisters/notifications_index/c2c_client/src/lib.rs index b076c50a66..aa1a9325fc 100644 --- a/backend/canisters/notifications_index/c2c_client/src/lib.rs +++ b/backend/canisters/notifications_index/c2c_client/src/lib.rs @@ -5,3 +5,4 @@ use notifications_index_canister::*; // Updates generate_c2c_call!(add_notifications_canister); +generate_c2c_call!(c2c_sync_user_index_events); diff --git a/backend/canisters/notifications_index/impl/Cargo.toml b/backend/canisters/notifications_index/impl/Cargo.toml index 1d5724da91..7be58ab180 100644 --- a/backend/canisters/notifications_index/impl/Cargo.toml +++ b/backend/canisters/notifications_index/impl/Cargo.toml @@ -35,6 +35,7 @@ stable_memory_map = { path = "../../../libraries/stable_memory_map" } timer_job_queues = { path = "../../../libraries/timer_job_queues" } tracing = { workspace = true } types = { path = "../../../libraries/types" } +user_ids_set = { path = "../../../libraries/user_ids_set" } user_index_canister = { path = "../../user_index/api" } user_index_canister_c2c_client = { path = "../../user_index/c2c_client" } utils = { path = "../../../libraries/utils" } diff --git a/backend/canisters/notifications_index/impl/src/guards.rs b/backend/canisters/notifications_index/impl/src/guards.rs index ef906e2078..c0e7d1f241 100644 --- a/backend/canisters/notifications_index/impl/src/guards.rs +++ b/backend/canisters/notifications_index/impl/src/guards.rs @@ -8,6 +8,14 @@ pub fn caller_is_governance_principal() -> Result<(), String> { } } +pub fn caller_is_user_index_canister() -> Result<(), String> { + if read_state(|state| state.is_caller_user_index_canister()) { + Ok(()) + } else { + Err("Caller is not the UserIndex canister".to_string()) + } +} + pub fn caller_is_registry_canister() -> Result<(), String> { if read_state(|state| state.is_caller_registry_canister()) { Ok(()) diff --git a/backend/canisters/notifications_index/impl/src/lib.rs b/backend/canisters/notifications_index/impl/src/lib.rs index ee5bfc8e2c..8d8d4e1d87 100644 --- a/backend/canisters/notifications_index/impl/src/lib.rs +++ b/backend/canisters/notifications_index/impl/src/lib.rs @@ -6,10 +6,12 @@ use canister_state_macros::canister_state; use notifications_index_canister::{NotificationsIndexEvent, SubscriptionAdded, SubscriptionRemoved}; use principal_to_user_id_map::PrincipalToUserIdMap; use serde::{Deserialize, Serialize}; +use stable_memory_map::UserIdsKeyPrefix; use std::cell::RefCell; use std::collections::{BTreeMap, HashMap, HashSet}; use timer_job_queues::GroupedTimerJobQueue; use types::{BuildVersion, CanisterId, CanisterWasm, Cycles, SubscriptionInfo, TimestampMillis, Timestamped, UserId}; +use user_ids_set::UserIdsSet; use utils::canister::CanistersRequiringUpgrade; use utils::canister_event_sync_queue::CanisterEventSyncQueue; use utils::env::Environment; @@ -43,6 +45,10 @@ impl RuntimeState { self.data.governance_principals.contains(&caller) } + pub fn is_caller_user_index_canister(&self) -> bool { + self.env.caller() == self.data.user_index_canister_id + } + pub fn is_caller_registry_canister(&self) -> bool { self.env.caller() == self.data.registry_canister_id } @@ -134,6 +140,8 @@ struct Data { pub notifications_index_event_sync_queue: CanisterEventSyncQueue, #[serde(default = "notification_canisters_event_sync_queue")] pub notification_canisters_event_sync_queue: GroupedTimerJobQueue, + #[serde(default = "blocked_users")] + pub blocked_users: UserIdsSet, pub rng_seed: [u8; 32], pub test_mode: bool, } @@ -142,6 +150,10 @@ fn notification_canisters_event_sync_queue() -> GroupedTimerJobQueue UserIdsSet { + UserIdsSet::new(UserIdsKeyPrefix::new_for_blocked_users()) +} + impl Data { pub fn new( governance_principals: Vec, @@ -166,6 +178,7 @@ impl Data { canisters_requiring_upgrade: CanistersRequiringUpgrade::default(), notifications_index_event_sync_queue: CanisterEventSyncQueue::default(), notification_canisters_event_sync_queue: GroupedTimerJobQueue::new(5, false), + blocked_users: UserIdsSet::new(UserIdsKeyPrefix::new_for_blocked_users()), rng_seed: [0; 32], test_mode, } diff --git a/backend/canisters/notifications_index/impl/src/updates/c2c_sync_user_index_events.rs b/backend/canisters/notifications_index/impl/src/updates/c2c_sync_user_index_events.rs new file mode 100644 index 0000000000..2f8e155b1a --- /dev/null +++ b/backend/canisters/notifications_index/impl/src/updates/c2c_sync_user_index_events.rs @@ -0,0 +1,29 @@ +use crate::guards::caller_is_user_index_canister; +use crate::{mutate_state, RuntimeState}; +use canister_api_macros::update; +use canister_tracing_macros::trace; +use notifications_index_canister::c2c_sync_user_index_events::{Response::*, *}; +use notifications_index_canister::{NotificationsIndexEvent, UserIndexEvent}; +use stable_memory_map::StableMemoryMap; + +#[update(guard = "caller_is_user_index_canister", msgpack = true)] +#[trace] +fn c2c_sync_user_index_events(args: Args) -> Response { + mutate_state(|state| c2c_sync_user_index_events_impl(args, state)) +} + +fn c2c_sync_user_index_events_impl(args: Args, state: &mut RuntimeState) -> Response { + for event in args.events { + match event { + UserIndexEvent::UserBlocked(user_id, blocked) => { + state.data.blocked_users.insert((blocked, user_id), ()); + state.push_event_to_notifications_canisters(NotificationsIndexEvent::UserBlocked(user_id, blocked)); + } + UserIndexEvent::UserUnblocked(user_id, unblocked) => { + state.data.blocked_users.remove(&(unblocked, user_id)); + state.push_event_to_notifications_canisters(NotificationsIndexEvent::UserUnblocked(user_id, unblocked)); + } + } + } + Success +} diff --git a/backend/canisters/notifications_index/impl/src/updates/mod.rs b/backend/canisters/notifications_index/impl/src/updates/mod.rs index 27e6fe6c77..0d376e0cc5 100644 --- a/backend/canisters/notifications_index/impl/src/updates/mod.rs +++ b/backend/canisters/notifications_index/impl/src/updates/mod.rs @@ -1,4 +1,5 @@ mod add_notifications_canister; +mod c2c_sync_user_index_events; mod push_subscription; mod remove_subscription; mod remove_subscriptions; diff --git a/backend/canisters/user/impl/src/lib.rs b/backend/canisters/user/impl/src/lib.rs index 307093bb2d..6c418b927c 100644 --- a/backend/canisters/user/impl/src/lib.rs +++ b/backend/canisters/user/impl/src/lib.rs @@ -385,12 +385,14 @@ impl Data { pub fn block_user(&mut self, user_id: UserId, now: TimestampMillis) { if self.blocked_users.value.insert(user_id) { self.blocked_users.timestamp = now; + self.push_local_user_index_canister_event(LocalUserIndexEvent::UserBlocked(user_id)); } } - pub fn unblock_user(&mut self, user_id: &UserId, now: TimestampMillis) { - if self.blocked_users.value.remove(user_id) { + pub fn unblock_user(&mut self, user_id: UserId, now: TimestampMillis) { + if self.blocked_users.value.remove(&user_id) { self.blocked_users.timestamp = now; + self.push_local_user_index_canister_event(LocalUserIndexEvent::UserUnblocked(user_id)); } } diff --git a/backend/canisters/user/impl/src/updates/unblock_user.rs b/backend/canisters/user/impl/src/updates/unblock_user.rs index 5644a96b5a..df5a693575 100644 --- a/backend/canisters/user/impl/src/updates/unblock_user.rs +++ b/backend/canisters/user/impl/src/updates/unblock_user.rs @@ -18,6 +18,6 @@ fn unblock_user_impl(args: Args, state: &mut RuntimeState) -> Response { } let now = state.env.now(); - state.data.unblock_user(&args.user_id, now); + state.data.unblock_user(args.user_id, now); Success } diff --git a/backend/canisters/user_index/api/src/lib.rs b/backend/canisters/user_index/api/src/lib.rs index bba914d898..d9cebd8bff 100644 --- a/backend/canisters/user_index/api/src/lib.rs +++ b/backend/canisters/user_index/api/src/lib.rs @@ -28,6 +28,8 @@ pub enum LocalUserIndexEvent { NotifyStreakInsuranceClaim(Box), BotInstalled(Box), BotUninstalled(Box), + UserBlocked(UserId, UserId), + UserUnblocked(UserId, UserId), } #[derive(Serialize, Deserialize, Clone, Debug)] diff --git a/backend/canisters/user_index/impl/Cargo.toml b/backend/canisters/user_index/impl/Cargo.toml index b84f69aa5c..b075df2d90 100644 --- a/backend/canisters/user_index/impl/Cargo.toml +++ b/backend/canisters/user_index/impl/Cargo.toml @@ -50,6 +50,8 @@ p256_key_pair = { path = "../../../libraries/p256_key_pair" } modclub_canister = { path = "../../../external_canisters/modclub/api" } modclub_canister_c2c_client = { path = "../../../external_canisters/modclub/c2c_client" } msgpack = { path = "../../../libraries/msgpack" } +notifications_index_canister = { path = "../../notifications_index/api" } +notifications_index_canister_c2c_client = { path = "../../notifications_index/c2c_client" } online_users_canister = { path = "../../online_users/api" } online_users_canister_c2c_client = { path = "../../online_users/c2c_client" } proof_of_unique_personhood = { path = "../../../libraries/proof_of_unique_personhood" } diff --git a/backend/canisters/user_index/impl/src/lib.rs b/backend/canisters/user_index/impl/src/lib.rs index 4acd2e0494..57a96affd7 100644 --- a/backend/canisters/user_index/impl/src/lib.rs +++ b/backend/canisters/user_index/impl/src/lib.rs @@ -1,4 +1,5 @@ use crate::model::local_user_index_map::LocalUserIndex; +use crate::model::notifications_index_event_batch::NotificationsIndexEventBatch; use crate::model::storage_index_user_config_batch::StorageIndexUserConfigBatch; use crate::model::storage_index_users_to_remove_batch::StorageIndexUsersToRemoveBatch; use crate::model::streak_insurance_logs::StreakInsuranceLogs; @@ -353,6 +354,8 @@ struct Data { pub storage_index_user_sync_queue: GroupedTimerJobQueue, pub storage_index_users_to_remove_queue: GroupedTimerJobQueue, pub user_index_event_sync_queue: CanisterEventSyncQueue, + #[serde(default = "notifications_index_event_sync_queue")] + pub notifications_index_event_sync_queue: GroupedTimerJobQueue, pub pending_payments_queue: PendingPaymentsQueue, pub pending_modclub_submissions_queue: PendingModclubSubmissionsQueue, pub platform_moderators: HashSet, @@ -387,6 +390,10 @@ struct Data { pub streak_insurance_logs: StreakInsuranceLogs, } +fn notifications_index_event_sync_queue() -> GroupedTimerJobQueue { + GroupedTimerJobQueue::new(1, false) +} + impl Data { #[allow(clippy::too_many_arguments)] pub fn new( @@ -434,6 +441,7 @@ impl Data { storage_index_user_sync_queue: GroupedTimerJobQueue::new(1, false), storage_index_users_to_remove_queue: GroupedTimerJobQueue::new(1, false), user_index_event_sync_queue: CanisterEventSyncQueue::default(), + notifications_index_event_sync_queue: GroupedTimerJobQueue::new(1, false), pending_payments_queue: PendingPaymentsQueue::default(), pending_modclub_submissions_queue: PendingModclubSubmissionsQueue::default(), platform_moderators: HashSet::new(), @@ -546,6 +554,7 @@ impl Default for Data { storage_index_user_sync_queue: GroupedTimerJobQueue::new(1, false), storage_index_users_to_remove_queue: GroupedTimerJobQueue::new(1, false), user_index_event_sync_queue: CanisterEventSyncQueue::default(), + notifications_index_event_sync_queue: GroupedTimerJobQueue::new(1, false), pending_payments_queue: PendingPaymentsQueue::default(), pending_modclub_submissions_queue: PendingModclubSubmissionsQueue::default(), platform_moderators: HashSet::new(), diff --git a/backend/canisters/user_index/impl/src/model/mod.rs b/backend/canisters/user_index/impl/src/model/mod.rs index d55c40950b..56f5a8ba79 100644 --- a/backend/canisters/user_index/impl/src/model/mod.rs +++ b/backend/canisters/user_index/impl/src/model/mod.rs @@ -3,6 +3,7 @@ pub mod chit_leaderboard; pub mod diamond_membership_details; pub mod external_achievements; pub mod local_user_index_map; +pub mod notifications_index_event_batch; pub mod pending_modclub_submissions_queue; pub mod pending_payments_queue; pub mod reported_messages; diff --git a/backend/canisters/user_index/impl/src/model/notifications_index_event_batch.rs b/backend/canisters/user_index/impl/src/model/notifications_index_event_batch.rs new file mode 100644 index 0000000000..7e1391da7e --- /dev/null +++ b/backend/canisters/user_index/impl/src/model/notifications_index_event_batch.rs @@ -0,0 +1,26 @@ +use notifications_index_canister::UserIndexEvent; +use timer_job_queues::{grouped_timer_job_batch, TimerJobItem}; +use types::CanisterId; +use utils::canister::should_retry_failed_c2c_call; + +grouped_timer_job_batch!(NotificationsIndexEventBatch, CanisterId, UserIndexEvent, 100); + +impl TimerJobItem for NotificationsIndexEventBatch { + async fn process(&self) -> Result<(), bool> { + let response = notifications_index_canister_c2c_client::c2c_sync_user_index_events( + self.key, + ¬ifications_index_canister::c2c_sync_user_index_events::Args { + events: self.items.clone(), + }, + ) + .await; + + match response { + Ok(_) => Ok(()), + Err((code, msg)) => { + let retry = should_retry_failed_c2c_call(code, &msg); + Err(retry) + } + } + } +} diff --git a/backend/canisters/user_index/impl/src/updates/c2c_notify_events.rs b/backend/canisters/user_index/impl/src/updates/c2c_notify_events.rs index 199a2c66cb..f7935c7948 100644 --- a/backend/canisters/user_index/impl/src/updates/c2c_notify_events.rs +++ b/backend/canisters/user_index/impl/src/updates/c2c_notify_events.rs @@ -130,6 +130,14 @@ fn handle_event(event: LocalUserIndexEvent, caller: Principal, now: TimestampMil LocalUserIndexEvent::BotUninstalled(ev) => { state.data.users.remove_bot_installation(ev.bot_id, &ev.location); } + LocalUserIndexEvent::UserBlocked(user_id, blocked) => state.data.notifications_index_event_sync_queue.push( + state.data.notifications_index_canister_id, + notifications_index_canister::UserIndexEvent::UserBlocked(user_id, blocked), + ), + LocalUserIndexEvent::UserUnblocked(user_id, unblocked) => state.data.notifications_index_event_sync_queue.push( + state.data.notifications_index_canister_id, + notifications_index_canister::UserIndexEvent::UserUnblocked(user_id, unblocked), + ), } } diff --git a/backend/integration_tests/src/client/notifications.rs b/backend/integration_tests/src/client/notifications.rs index 20d9c84871..4ebd41567b 100644 --- a/backend/integration_tests/src/client/notifications.rs +++ b/backend/integration_tests/src/client/notifications.rs @@ -6,3 +6,29 @@ generate_query_call!(latest_notification_index); generate_query_call!(notifications); // Updates + +pub mod happy_path { + use candid::Principal; + use notifications_canister::notifications::SuccessResult; + use pocket_ic::PocketIc; + use types::CanisterId; + + pub fn notifications( + env: &PocketIc, + sender: Principal, + notifications_canister_id: CanisterId, + from_index: u64, + ) -> SuccessResult { + let response = super::notifications( + env, + sender, + notifications_canister_id, + ¬ifications_canister::notifications::Args { + from_notification_index: from_index, + }, + ); + + let notifications_canister::notifications::Response::Success(result) = response; + result + } +} diff --git a/backend/integration_tests/src/client/user.rs b/backend/integration_tests/src/client/user.rs index a13fdf3a40..8495311e9c 100644 --- a/backend/integration_tests/src/client/user.rs +++ b/backend/integration_tests/src/client/user.rs @@ -515,4 +515,26 @@ pub mod happy_path { response => panic!("'approve_transfer' error: {response:?}"), }; } + + pub fn block_user(env: &mut PocketIc, user: &User, blocked: UserId) { + let response = super::block_user( + env, + user.principal, + user.canister(), + &user_canister::block_user::Args { user_id: blocked }, + ); + + assert!(matches!(response, user_canister::block_user::Response::Success)); + } + + pub fn unblock_user(env: &mut PocketIc, user: &User, unblocked: UserId) { + let response = super::unblock_user( + env, + user.principal, + user.canister(), + &user_canister::unblock_user::Args { user_id: unblocked }, + ); + + assert!(matches!(response, user_canister::unblock_user::Response::Success)); + } } diff --git a/backend/integration_tests/src/notification_tests.rs b/backend/integration_tests/src/notification_tests.rs index 7a53754306..f17b7195f5 100644 --- a/backend/integration_tests/src/notification_tests.rs +++ b/backend/integration_tests/src/notification_tests.rs @@ -1,4 +1,5 @@ use crate::env::ENV; +use crate::utils::tick_many; use crate::{client, CanisterIds, TestEnv, User}; use candid::Principal; use itertools::Itertools; @@ -25,13 +26,11 @@ fn direct_message_notification_succeeds() { client::user::happy_path::send_text_message(env, &user1, user2.user_id, random_string(), None); - let notifications_canister::notifications::Response::Success(notifications_response) = client::notifications::notifications( + let notifications_response = client::notifications::happy_path::notifications( env, *controller, notifications_canister, - ¬ifications_canister::notifications::Args { - from_notification_index: latest_notification_index + 1, - }, + latest_notification_index + 1, ); assert_eq!(notifications_response.notifications.len(), 1); @@ -64,13 +63,11 @@ fn group_message_notification_succeeds() { client::group::happy_path::send_text_message(env, &user1, group_id, None, random_string(), None); - let notifications_canister::notifications::Response::Success(notifications_response) = client::notifications::notifications( + let notifications_response = client::notifications::happy_path::notifications( env, *controller, notifications_canister, - ¬ifications_canister::notifications::Args { - from_notification_index: latest_notification_index + 1, - }, + latest_notification_index + 1, ); // There should be 2 notifications (1 for being added to the group, 1 for the message) @@ -105,13 +102,11 @@ fn direct_message_notification_muted() { client::user::happy_path::send_text_message(env, &user1, user2.user_id, random_string(), None); - let notifications_canister::notifications::Response::Success(notifications_response) = client::notifications::notifications( + let notifications_response = client::notifications::happy_path::notifications( env, *controller, notifications_canister, - ¬ifications_canister::notifications::Args { - from_notification_index: latest_notification_index + 1, - }, + latest_notification_index + 1, ); assert!(notifications_response.notifications.is_empty()); @@ -188,13 +183,11 @@ fn group_message_notification_muted(case: u32) { }, ); - let notifications_canister::notifications::Response::Success(notifications_response) = client::notifications::notifications( + let notifications_response = client::notifications::happy_path::notifications( env, *controller, notifications_canister, - ¬ifications_canister::notifications::Args { - from_notification_index: latest_notification_index + 1, - }, + latest_notification_index + 1, ); if case == 1 { @@ -234,15 +227,12 @@ fn only_store_up_to_10_subscriptions_per_user() { client::user::happy_path::send_text_message(env, &user1, user2.user_id, random_string(), None); - let notifications_canister::notifications::Response::Success(mut notifications_response) = - client::notifications::notifications( - env, - *controller, - notifications_canister, - ¬ifications_canister::notifications::Args { - from_notification_index: latest_notification_index + 1, - }, - ); + let mut notifications_response = client::notifications::happy_path::notifications( + env, + *controller, + notifications_canister, + latest_notification_index + 1, + ); let subscriptions = notifications_response.subscriptions.remove(&user2.user_id).unwrap(); @@ -252,6 +242,74 @@ fn only_store_up_to_10_subscriptions_per_user() { ); } +#[test] +fn notifications_blocked_from_blocked_users() { + let mut wrapper = ENV.deref().get(); + let TestEnv { + env, + canister_ids, + controller, + .. + } = wrapper.env(); + + let TestData { user1, user2 } = init_test_data(env, canister_ids); + + let group_id = client::user::happy_path::create_group(env, &user1, &random_string(), false, false); + client::local_user_index::happy_path::add_users_to_group( + env, + &user1, + canister_ids.local_user_index(env, group_id), + group_id, + vec![(user2.user_id, user2.principal)], + ); + + let notifications_canister = canister_ids.notifications(env, group_id); + let latest_notification_index = latest_notification_index(env, notifications_canister, *controller); + + client::group::happy_path::send_text_message(env, &user1, group_id, None, random_string(), None); + + let notifications_response = client::notifications::happy_path::notifications( + env, + *controller, + notifications_canister, + latest_notification_index + 1, + ); + + assert_eq!(notifications_response.notifications.len(), 1); + assert!(notifications_response.subscriptions.contains_key(&user2.user_id)); + + client::user::happy_path::block_user(env, &user2, user1.user_id); + + tick_many(env, 3); + + client::group::happy_path::send_text_message(env, &user1, group_id, None, random_string(), None); + + let notifications_response = client::notifications::happy_path::notifications( + env, + *controller, + notifications_canister, + latest_notification_index + 2, + ); + + assert!(notifications_response.notifications.is_empty()); + + client::user::happy_path::unblock_user(env, &user2, user1.user_id); + + tick_many(env, 3); + + client::group::happy_path::send_text_message(env, &user1, group_id, None, random_string(), None); + + let notifications_response = client::notifications::happy_path::notifications( + env, + *controller, + notifications_canister, + latest_notification_index + 2, + ); + + assert_eq!(notifications_response.notifications.len(), 1); + assert!(notifications_response.subscriptions.contains_key(&user2.user_id)); +} + fn latest_notification_index(env: &PocketIc, notifications_canister_id: Principal, controller: Principal) -> u64 { let notifications_canister::latest_notification_index::Response::Success(latest_notification_index) = client::notifications::latest_notification_index( diff --git a/backend/libraries/stable_memory_map/src/keys.rs b/backend/libraries/stable_memory_map/src/keys.rs index 08ad8d79ac..fbc8e64887 100644 --- a/backend/libraries/stable_memory_map/src/keys.rs +++ b/backend/libraries/stable_memory_map/src/keys.rs @@ -97,6 +97,7 @@ pub enum KeyType { FileReferenceCount = 13, FilesPerAccessor = 14, UserStorageRecord = 15, + BlockedUsers = 16, } fn extract_key_type(bytes: &[u8]) -> Option { @@ -123,6 +124,7 @@ impl TryFrom for KeyType { 13 => Ok(KeyType::FileReferenceCount), 14 => Ok(KeyType::FilesPerAccessor), 15 => Ok(KeyType::UserStorageRecord), + 16 => Ok(KeyType::BlockedUsers), _ => Err(()), } } diff --git a/backend/libraries/stable_memory_map/src/keys/user_id.rs b/backend/libraries/stable_memory_map/src/keys/user_id.rs index 4f5d9a37e1..1db3a7e153 100644 --- a/backend/libraries/stable_memory_map/src/keys/user_id.rs +++ b/backend/libraries/stable_memory_map/src/keys/user_id.rs @@ -65,6 +65,41 @@ impl UserIdKey { } } +key!(UserIdsKey, UserIdsKeyPrefix, KeyType::BlockedUsers); + +impl UserIdsKeyPrefix { + pub fn new_for_blocked_users() -> Self { + // KeyType::BlockedUsers 1 byte + UserIdsKeyPrefix(vec![KeyType::BlockedUsers as u8]) + } +} + +impl KeyPrefix for UserIdsKeyPrefix { + type Key = UserIdsKey; + type Suffix = (UserId, UserId); + + fn create_key(&self, (user_id1, user_id2): &(UserId, UserId)) -> Self::Key { + let user_id1_bytes = user_id1.as_slice(); + let user_id2_bytes = user_id2.as_slice(); + let mut bytes = Vec::with_capacity(self.0.len() + 1 + user_id1_bytes.len() + user_id2_bytes.len()); + bytes.extend_from_slice(self.0.as_slice()); + bytes.push(user_id1_bytes.len() as u8); + bytes.extend_from_slice(user_id1_bytes); + bytes.extend_from_slice(user_id2_bytes); + UserIdsKey(bytes) + } +} + +impl UserIdsKey { + pub fn user_ids(&self) -> (UserId, UserId) { + let user_id1_len = self.0[1] as usize; + let user_id1_end = 2 + user_id1_len; + let user_id1 = Principal::from_slice(&self.0[2..user_id1_end]).into(); + let user_id2 = Principal::from_slice(&self.0[user_id1_end..]).into(); + (user_id1, user_id2) + } +} + #[cfg(test)] mod tests { use super::*; @@ -137,4 +172,28 @@ mod tests { assert_eq!(deserialized.0, key.0); } } + + #[test] + fn blocked_users_key_e2e() { + for _ in 0..100 { + let user_id1_bytes: [u8; 10] = thread_rng().gen(); + let user_id1 = UserId::from(Principal::from_slice(&user_id1_bytes)); + let user_id2_bytes: [u8; 10] = thread_rng().gen(); + let user_id2 = UserId::from(Principal::from_slice(&user_id2_bytes)); + let prefix = UserIdsKeyPrefix::new_for_blocked_users(); + let key = BaseKey::from(prefix.create_key(&(user_id1, user_id2))); + let blocked_users_key = UserIdsKey::try_from(key.clone()).unwrap(); + + assert_eq!(*blocked_users_key.0.first().unwrap(), KeyType::BlockedUsers as u8); + assert_eq!(blocked_users_key.0.len(), 22); + assert!(blocked_users_key.matches_prefix(&prefix)); + assert_eq!(blocked_users_key.user_ids(), (user_id1, user_id2)); + + let serialized = msgpack::serialize_then_unwrap(&blocked_users_key); + assert_eq!(serialized.len(), blocked_users_key.0.len() + 2); + let deserialized: UserIdsKey = msgpack::deserialize_then_unwrap(&serialized); + assert_eq!(deserialized, blocked_users_key); + assert_eq!(deserialized.0, key.0); + } + } } diff --git a/backend/libraries/user_ids_set/Cargo.toml b/backend/libraries/user_ids_set/Cargo.toml new file mode 100644 index 0000000000..37901a3154 --- /dev/null +++ b/backend/libraries/user_ids_set/Cargo.toml @@ -0,0 +1,10 @@ +[package] +name = "user_ids_set" +version = "0.1.0" +edition = "2021" + +[dependencies] +ic_principal = { workspace = true } +stable_memory_map = { path = "../stable_memory_map" } +types = { path = "../types" } +serde = { workspace = true } diff --git a/backend/libraries/user_ids_set/src/lib.rs b/backend/libraries/user_ids_set/src/lib.rs new file mode 100644 index 0000000000..fce63ae213 --- /dev/null +++ b/backend/libraries/user_ids_set/src/lib.rs @@ -0,0 +1,55 @@ +use ic_principal::Principal; +use serde::{Deserialize, Serialize}; +use stable_memory_map::{with_map, KeyPrefix, LazyValue, StableMemoryMap, UserIdsKeyPrefix}; +use types::UserId; + +#[derive(Serialize, Deserialize)] +pub struct UserIdsSet { + prefix: UserIdsKeyPrefix, + len: u32, +} + +impl StableMemoryMap for UserIdsSet { + fn prefix(&self) -> &UserIdsKeyPrefix { + &self.prefix + } + + fn value_to_bytes(_value: ()) -> Vec { + Vec::new() + } + + fn bytes_to_value(_key: &(UserId, UserId), _bytes: Vec) {} + + fn on_inserted(&mut self, _key: &(UserId, UserId), existing: &Option>) { + if existing.is_none() { + self.len = self.len.saturating_add(1); + } + } + + fn on_removed(&mut self, _key: &(UserId, UserId), _removed: &LazyValue<(UserId, UserId), ()>) { + self.len = self.len.saturating_sub(1); + } +} + +impl UserIdsSet { + pub fn new(prefix: UserIdsKeyPrefix) -> Self { + Self { prefix, len: 0 } + } + + pub fn len(&self) -> u32 { + self.len + } + + pub fn is_empty(&self) -> bool { + self.len == 0 + } + + pub fn all_linked_users>(&self, user_id1: UserId) -> I { + with_map(|m| { + m.range(self.prefix.create_key(&(user_id1, Principal::from_slice(&[]).into()))..) + .take_while(|(key, _)| key.user_ids().0 == user_id1) + .map(|(key, _)| key.user_ids().1) + .collect() + }) + } +}