Skip to content

Commit

Permalink
Push notification registration refactoring
Browse files Browse the repository at this point in the history
  • Loading branch information
fanchao committed Sep 4, 2024
1 parent 844b2d4 commit 34bc669
Show file tree
Hide file tree
Showing 7 changed files with 255 additions and 86 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,6 @@
import org.session.libsignal.utilities.ThreadUtils;
import org.signal.aesgcmprovider.AesGcmProvider;
import org.thoughtcrime.securesms.components.TypingStatusSender;
import org.thoughtcrime.securesms.crypto.KeyPairUtilities;
import org.thoughtcrime.securesms.database.EmojiSearchDatabase;
import org.thoughtcrime.securesms.database.LastSentTimestampCache;
import org.thoughtcrime.securesms.database.LokiAPIDatabase;
Expand All @@ -88,7 +87,7 @@
import org.thoughtcrime.securesms.logging.UncaughtExceptionLogger;
import org.thoughtcrime.securesms.notifications.BackgroundPollWorker;
import org.thoughtcrime.securesms.notifications.DefaultMessageNotifier;
import org.thoughtcrime.securesms.notifications.GroupsPushRegistrationHandler;
import org.thoughtcrime.securesms.notifications.PushRegistrationHandler;
import org.thoughtcrime.securesms.notifications.NotificationChannels;
import org.thoughtcrime.securesms.notifications.OptimizedMessageNotifier;
import org.thoughtcrime.securesms.providers.BlobProvider;
Expand All @@ -112,7 +111,6 @@
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.Timer;
import java.util.concurrent.Executors;

Expand Down Expand Up @@ -164,7 +162,7 @@ public class ApplicationContext extends Application implements DefaultLifecycleO
@Inject LastSentTimestampCache lastSentTimestampCache;
@Inject VersionDataFetcher versionDataFetcher;
@Inject
GroupsPushRegistrationHandler groupsPushRegistrationHandler;
PushRegistrationHandler pushRegistrationHandler;
@Inject TokenFetcher tokenFetcher;
CallMessageProcessor callMessageProcessor;
MessagingModuleConfiguration messagingModuleConfiguration;
Expand Down Expand Up @@ -279,7 +277,7 @@ public void onCreate() {
NetworkConstraint networkConstraint = new NetworkConstraint.Factory(this).create();
HTTP.INSTANCE.setConnectedToNetwork(networkConstraint::isMet);

groupsPushRegistrationHandler.run();
pushRegistrationHandler.run();

// add our shortcut debug menu if we are not in a release build
if (BuildConfig.BUILD_TYPE != "release") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,12 @@

import java.io.IOException;

import kotlin.Unit;
import kotlinx.coroutines.channels.BufferOverflow;
import kotlinx.coroutines.flow.MutableSharedFlow;
import kotlinx.coroutines.flow.MutableStateFlow;
import kotlinx.coroutines.flow.SharedFlowKt;

/**
* Utility class for working with identity keys.
*
Expand All @@ -56,6 +62,8 @@ public class IdentityKeyUtil {
public static final String LOKI_SEED = "loki_seed";
public static final String HAS_MIGRATED_KEY = "has_migrated_keys";

public static final MutableSharedFlow<Unit> CHANGES = SharedFlowKt.MutableSharedFlow(0, 1, BufferOverflow.DROP_LATEST);

private static SharedPreferences getSharedPreferences(Context context) {
return context.getSharedPreferences(MASTER_SECRET_UTIL_PREFERENCES_NAME, 0);
}
Expand Down Expand Up @@ -158,9 +166,11 @@ public static void save(Context context, String key, String value) {
}

if (!preferencesEditor.commit()) throw new AssertionError("failed to save identity key/value to shared preferences");
CHANGES.tryEmit(Unit.INSTANCE);
}

public static void delete(Context context, String key) {
context.getSharedPreferences(MASTER_SECRET_UTIL_PREFERENCES_NAME, 0).edit().remove(key).commit();
CHANGES.tryEmit(Unit.INSTANCE);
}
}

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,238 @@
package org.thoughtcrime.securesms.notifications

import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Deferred
import kotlinx.coroutines.DelicateCoroutinesApi
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.FlowPreview
import kotlinx.coroutines.GlobalScope
import kotlinx.coroutines.Job
import kotlinx.coroutines.async
import kotlinx.coroutines.awaitAll
import kotlinx.coroutines.flow.combine
import kotlinx.coroutines.flow.debounce
import kotlinx.coroutines.flow.filterNotNull
import kotlinx.coroutines.flow.onStart
import kotlinx.coroutines.flow.scan
import kotlinx.coroutines.launch
import network.loki.messenger.libsession_util.GroupInfoConfig
import network.loki.messenger.libsession_util.GroupKeysConfig
import network.loki.messenger.libsession_util.GroupMembersConfig
import org.session.libsession.database.userAuth
import org.session.libsession.messaging.notifications.TokenFetcher
import org.session.libsession.snode.GroupSubAccountSwarmAuth
import org.session.libsession.snode.OwnedSwarmAuth
import org.session.libsession.snode.SwarmAuth
import org.session.libsession.utilities.TextSecurePreferences
import org.session.libsession.utilities.withGroupConfigsOrNull
import org.session.libsignal.utilities.AccountId
import org.session.libsignal.utilities.Log
import org.session.libsignal.utilities.Namespace
import org.thoughtcrime.securesms.crypto.IdentityKeyUtil
import org.thoughtcrime.securesms.database.Storage
import org.thoughtcrime.securesms.dependencies.ConfigFactory
import javax.inject.Inject

private const val TAG = "PushRegistrationHandler"

/**
* A class that listens to the config changes, user's preference and register
* for system level push notification accordingly.
*
* This class only caters for new groups' push registration, and it does not handle
* de-registration of push notification for groups that are removed, as the data needed
* for de-registration can not be guaranteed to be available at the time we discover that
* a group is removed.
*/
class PushRegistrationHandler
@Inject
constructor(
private val pushRegistry: PushRegistryV2,
private val configFactory: ConfigFactory,
private val preferences: TextSecurePreferences,
private val storage: Storage,
private val tokenFetcher: TokenFetcher,
) {
@OptIn(DelicateCoroutinesApi::class)
private val scope: CoroutineScope = GlobalScope

private var job: Job? = null

@OptIn(FlowPreview::class)
fun run() {
require(job == null) { "Job is already running" }

job = scope.launch(Dispatchers.Default) {
combine(
configFactory.configUpdateNotifications
.debounce(500L)
.onStart { emit(Unit) },
IdentityKeyUtil.CHANGES.onStart { emit(Unit) },
preferences.pushEnabled,
tokenFetcher.token,
) { _, _, enabled, token ->
if (!enabled || token.isNullOrEmpty()) {
return@combine emptyMap<SubscriptionKey, Subscription>()
}

val userAuth =
storage.userAuth ?: return@combine emptyMap<SubscriptionKey, Subscription>()
getGroupSubscriptions(
token = token,
userSecretKey = userAuth.ed25519PrivateKey
) + mapOf(
SubscriptionKey(userAuth.accountId, token) to OwnedSubscription(
userAuth,
0
)
)
}
.scan<Map<SubscriptionKey, Subscription>, Pair<Map<SubscriptionKey, Subscription>, Map<SubscriptionKey, Subscription>>?>(
null
) { acc, current ->
val prev = acc?.second.orEmpty()
prev to current
}
.filterNotNull()
.collect { (prev, current) ->
val addedAccountIds = current.keys - prev.keys
val removedAccountIDs = prev.keys - current.keys
if (addedAccountIds.isNotEmpty()) {
Log.d(TAG, "Adding ${addedAccountIds.size} new subscriptions")
}

if (removedAccountIDs.isNotEmpty()) {
Log.d(TAG, "Removing ${removedAccountIDs.size} subscriptions")
}

val deferred = mutableListOf<Deferred<*>>()

addedAccountIds.mapTo(deferred) { key ->
val subscription = current.getValue(key)
async {
try {
subscription.withAuth { auth ->
pushRegistry.register(
token = key.token,
swarmAuth = auth,
namespaces = listOf(subscription.namespace)
)
}
} catch (e: Exception) {
Log.e(TAG, "Failed to register for push notification", e)
}
}
}

removedAccountIDs.mapTo(deferred) { key ->
val subscription = prev.getValue(key)
async {
try {
subscription.withAuth { auth ->
pushRegistry.unregister(
token = key.token,
swarmAuth = auth,
)
}
} catch (e: Exception) {
Log.e(TAG, "Failed to unregister for push notification", e)
}
}
}

deferred.awaitAll()
}
}
}

private fun getGroupSubscriptions(
token: String,
userSecretKey: ByteArray
): Map<SubscriptionKey, Subscription> {
return buildMap {
val groups = configFactory.userGroups?.allClosedGroupInfo().orEmpty()
for (group in groups) {
val adminKey = group.adminKey
if (adminKey != null && adminKey.isNotEmpty()) {
put(
SubscriptionKey(group.groupAccountId, token),
OwnedSubscription(
auth = OwnedSwarmAuth.ofClosedGroup(group.groupAccountId, adminKey),
namespace = Namespace.GROUPS()
)
)
continue
}

val authData = group.authData
if (authData != null && authData.isNotEmpty()) {
val subscription =
configFactory.withGroupConfigsOrNull(group.groupAccountId) { info, members, keys ->
SubAccountSubscription(
authData = authData,
groupInfoConfigDump = info.dump(),
groupMembersConfigDump = members.dump(),
groupKeysConfigDump = keys.dump(),
groupId = group.groupAccountId,
userSecretKey = userSecretKey
)
}

if (subscription != null) {
put(SubscriptionKey(group.groupAccountId, token), subscription)
}
}
}
}
}

private data class SubscriptionKey(
val accountId: AccountId,
val token: String,
)

private sealed interface Subscription {
suspend fun withAuth(cb: suspend (SwarmAuth) -> Unit)
val namespace: Int
}

private class OwnedSubscription(val auth: OwnedSwarmAuth, override val namespace: Int) :
Subscription {
override suspend fun withAuth(cb: suspend (SwarmAuth) -> Unit) {
cb(auth)
}
}

private class SubAccountSubscription(
val groupId: AccountId,
val userSecretKey: ByteArray,
val authData: ByteArray,
val groupInfoConfigDump: ByteArray,
val groupMembersConfigDump: ByteArray,
val groupKeysConfigDump: ByteArray
) : Subscription {
override suspend fun withAuth(cb: suspend (SwarmAuth) -> Unit) {
GroupInfoConfig.newInstance(groupId.pubKeyBytes, initialDump = groupInfoConfigDump)
.use { info ->
GroupMembersConfig.newInstance(
groupId.pubKeyBytes,
initialDump = groupMembersConfigDump
).use { members ->
GroupKeysConfig.newInstance(
userSecretKey = userSecretKey,
groupPublicKey = groupId.pubKeyBytes,
initialDump = groupKeysConfigDump,
info = info,
members = members
).use { keys ->
cb(GroupSubAccountSwarmAuth(keys, groupId, authData))
}
}
}
}

override val namespace: Int
get() = Namespace.GROUPS()
}

}
Loading

0 comments on commit 34bc669

Please sign in to comment.