From d37dd829f021f2e3791095c1277fb5dc68cd695c Mon Sep 17 00:00:00 2001 From: Uxio Fuentefria Date: Fri, 28 Aug 2020 16:41:53 +0200 Subject: [PATCH] Detect duplicate notifications using Redis --- .../notifications/tasks.py | 45 +++++++++++++++++++ .../notifications/tests/test_tasks.py | 24 ++++++++-- 2 files changed, 66 insertions(+), 3 deletions(-) diff --git a/safe_transaction_service/notifications/tasks.py b/safe_transaction_service/notifications/tasks.py index 8a52b2aed..49984a2ba 100644 --- a/safe_transaction_service/notifications/tasks.py +++ b/safe_transaction_service/notifications/tasks.py @@ -1,7 +1,11 @@ +import pickle from typing import Any, Dict, Optional, Tuple +from django.conf import settings + from celery import app from celery.utils.log import get_task_logger +from redis import Redis from safe_transaction_service.history.models import WebHookType @@ -11,6 +15,12 @@ logger = get_task_logger(__name__) +def get_redis() -> Redis: + if not hasattr(get_redis, 'redis'): + get_redis.redis = Redis.from_url(settings.REDIS_URL) + return get_redis.redis + + def filter_notification(payload: Dict[str, Any]) -> bool: """ :param payload: Notification payload @@ -34,6 +44,33 @@ def filter_notification(payload: Dict[str, Any]) -> bool: return True + +class DuplicateNotification: + def __init__(self, address: Optional[str], payload: Dict[str, Any]): + self.redis = get_redis() + self.address = address + self.payload = payload + self.redis_payload = self._get_redis_payload(address, payload) + + def _get_redis_payload(self, address: Optional[str], payload: Dict[str, Any]): + new_payload = dict(payload) + new_payload['notification_to'] = address + return 'notifications:'.encode() + pickle.dumps(new_payload) + + def is_duplicated(self) -> bool: + """ + :return: True if payload was already notified, False otherwise + """ + return bool(self.redis.get(self.redis_payload)) + + def set_duplicated(self) -> bool: + """ + Stores notification with an expiration time of 5 minutes + :return: + """ + return self.redis.set(self.redis_payload, 1, ex=5 * 60) + + @app.shared_task() def send_notification_task(address: Optional[str], payload: Dict[str, Any]) -> Tuple[int, int]: if not (address and payload): # Both must be present @@ -50,6 +87,14 @@ def send_notification_task(address: Optional[str], payload: Dict[str, Any]) -> T if not (tokens and filter_notification(payload)): return 0 + # Make sure notification has not been sent before + duplicate_notification = DuplicateNotification(address, payload) + if duplicate_notification.is_duplicated(): + logger.info('Duplicated notification about Safe=%s with payload=%s to tokens=%s', address, payload, tokens) + return 0 + + duplicate_notification.set_duplicated() + logger.info('Sending notification about Safe=%s with payload=%s to tokens=%s', address, payload, tokens) success_count, failure_count, invalid_tokens = firebase_client.send_message(tokens, payload) if invalid_tokens: diff --git a/safe_transaction_service/notifications/tests/test_tasks.py b/safe_transaction_service/notifications/tests/test_tasks.py index 5b6addc48..300e7fa1a 100644 --- a/safe_transaction_service/notifications/tests/test_tasks.py +++ b/safe_transaction_service/notifications/tests/test_tasks.py @@ -1,16 +1,34 @@ from django.test import TestCase +from eth_account import Account + from safe_transaction_service.history.models import (MultisigConfirmation, - MultisigTransaction, - WebHookType) + MultisigTransaction) from safe_transaction_service.history.signals import build_webhook_payload from ...history.tests.factories import (MultisigConfirmationFactory, MultisigTransactionFactory) -from ..tasks import filter_notification +from ..tasks import DuplicateNotification, filter_notification class TestViews(TestCase): + def test_duplicate_notification_manager(self): + address = '0x1230B3d59858296A31053C1b8562Ecf89A2f888b' + payload = {'address': '0x1230B3d59858296A31053C1b8562Ecf89A2f888b', + 'type': 'INCOMING_TOKEN', + 'tokenAddress': '0x63704B63Ac04f3a173Dfe677C7e3D330c347CD88', + 'txHash': '0xd8cf5db08e4f3d43660975c8be02a079139a69c42c0ccdd157618aec9bb91b28', + 'value': '50000000000000'} + duplicate_notification = DuplicateNotification(address, payload) + self.assertFalse(duplicate_notification.is_duplicated()) + self.assertFalse(duplicate_notification.is_duplicated()) + duplicate_notification.set_duplicated() + self.assertTrue(duplicate_notification.is_duplicated()) + duplicate_notification_2 = DuplicateNotification(address, {'type': 'Different_payload'}) + self.assertFalse(duplicate_notification_2.is_duplicated()) + duplicate_notification_3 = DuplicateNotification(Account.create().address, payload) + self.assertFalse(duplicate_notification_3.is_duplicated()) + def test_filter_notification(self): multisig_confirmation = MultisigConfirmationFactory() confirmation_notification = build_webhook_payload(MultisigConfirmation, multisig_confirmation)