diff --git a/docker/entrypoint-unit-tests-devDocker.sh b/docker/entrypoint-unit-tests-devDocker.sh index fee7535f25d..19fcc80768c 100755 --- a/docker/entrypoint-unit-tests-devDocker.sh +++ b/docker/entrypoint-unit-tests-devDocker.sh @@ -55,13 +55,12 @@ echo "Unit Tests" echo "------------------------------------------------------------" # Removing parallel and shuffle for now to maintain stability -python3 manage.py test unittests.test_notifications -v 3 --keepdb --no-input -# python3 manage.py test unittests -v 3 --keepdb --no-input --exclude-tag="non-parallel" || { -# exit 1; -# } -# python3 manage.py test unittests -v 3 --keepdb --no-input --tag="non-parallel" || { -# exit 1; -# } +python3 manage.py test unittests -v 3 --keepdb --no-input --exclude-tag="non-parallel" || { + exit 1; +} +python3 manage.py test unittests -v 3 --keepdb --no-input --tag="non-parallel" || { + exit 1; +} # you can select a single file to "test" unit tests # python3 manage.py test unittests.tools.test_npm_audit_scan_parser.TestNpmAuditParser --keepdb -v 3 diff --git a/dojo/notifications/helper.py b/dojo/notifications/helper.py index f97b18116e3..7c90b9129f7 100644 --- a/dojo/notifications/helper.py +++ b/dojo/notifications/helper.py @@ -46,8 +46,9 @@ def create_notification( engagement: Engagement | None = None, product: Product | None = None, requested_by: Dojo_User | None = None, - reviewers: list[Dojo_User] | list = [], - recipients: list[Dojo_User] | list = [], + reviewers: list[Dojo_User] | list[str] | None = None, + recipients: list[Dojo_User] | list[str] | None = None, + no_users: bool = False, # noqa: FBT001 url: str | None = None, url_api: str | None = None, **kwargs: dict, @@ -60,8 +61,7 @@ def create_notification( module_name, _separator, class_name = notification_manager.rpartition(".") module = importlib.import_module(module_name) notification_manager_class = getattr(module, class_name) - manager_object = notification_manager_class() - manager_object.create_notification( + notification_manager_class().create_notification( event=event, title=title, finding=finding, @@ -71,6 +71,7 @@ def create_notification( requested_by=requested_by, reviewers=reviewers, recipients=recipients, + no_users=no_users, url=url, url_api=url_api, **kwargs, @@ -78,9 +79,10 @@ def create_notification( class NotificationManagerHelpers: + """Common functions for use in the Mangers.""" - def __init__(self, **kwargs) -> None: + def __init__(self, *_args: list, **_kwargs: dict) -> None: self._set_notifications_object() self._set_system_settings() @@ -95,8 +97,8 @@ def _set_system_settings(self) -> None: """Set the system settings object in the class.""" self.system_settings = System_Settings.objects.get() - def _create_description(self, event: str, **kwargs: dict) -> str: - if "description" not in kwargs.keys(): + def _create_description(self, event: str, kwargs: dict) -> str: + if kwargs.get("description") is None: if event == "product_added": kwargs["description"] = _("Product %s has been created successfully.") % kwargs["title"] elif event == "product_type_added": @@ -111,7 +113,7 @@ def _create_notification_message( event: str, user: Dojo_User, notification_type: str, - **kwargs: dict, + kwargs: dict, ) -> str: template = f"notifications/{notification_type}/{event.replace('/', '')}.tpl" kwargs.update({"user": user}) @@ -122,7 +124,7 @@ def _create_notification_message( # kwargs.update({"title": title}) if kwargs.get("description") is None: - kwargs.update({"description": self._create_description(event, **kwargs)}) + kwargs.update({"description": self._create_description(event, kwargs)}) try: notification_message = render_to_string(template, kwargs) @@ -134,7 +136,7 @@ def _create_notification_message( logger.error("error during rendering of template %s exception is %s", template, e) finally: if not notification_message: - kwargs["description"] = self._create_description(event, **kwargs) + kwargs["description"] = self._create_description(event, kwargs) notification_message = render_to_string(f"notifications/{notification_type}/other.tpl", kwargs) return notification_message or "" @@ -161,6 +163,7 @@ def _log_alert( class SlackNotificationManger(NotificationManagerHelpers): + """Manger for slack notifications and their helpers.""" @dojo_async_task @@ -242,7 +245,7 @@ def _post_slack_message(self, event: str, user: Dojo_User, channel: str, **kwarg "token": self.system_settings.slack_token, "channel": channel, "username": self.system_settings.slack_username, - "text": self._create_notification_message(event, user, "slack", **kwargs), + "text": self._create_notification_message(event, user, "slack", kwargs), }, ) @@ -253,11 +256,12 @@ def _post_slack_message(self, event: str, user: Dojo_User, channel: str, **kwarg class MSTeamsNotificationManger(NotificationManagerHelpers): + """Manger for slack notifications and their helpers.""" @dojo_async_task @app.task - def send_msteams_notification(self, event: str, user: Dojo_User, **kwargs: dict): + def send_msteams_notification(self, event: str, user: Dojo_User | None = None, **kwargs: dict): try: # Microsoft Teams doesn't offer direct message functionality, so no MS Teams PM functionality here... if user is None: @@ -266,7 +270,7 @@ def send_msteams_notification(self, event: str, user: Dojo_User, **kwargs: dict) res = requests.request( method="POST", url=self.system_settings.msteams_url, - data=self._create_notification_message(event, None, "msteams", **kwargs), + data=self._create_notification_message(event, None, "msteams", kwargs), ) if res.status_code != 200: logger.error("Error when sending message to Microsoft Teams") @@ -287,14 +291,15 @@ def send_msteams_notification(self, event: str, user: Dojo_User, **kwargs: dict) class EmailNotificationManger(NotificationManagerHelpers): + """Manger for slack notifications and their helpers.""" @dojo_async_task @app.task - def send_mail_notification(self, event: str, user: Dojo_User, **kwargs: dict): + def send_mail_notification(self, event: str, user: Dojo_User | None = None, **kwargs: dict): # Attempt to get the "to" address - if "recipient" in kwargs: - address = kwargs.get("recipient") + if (recipient := kwargs.get("recipient")) is not None: + address = recipient elif user: address = user.email else: @@ -304,12 +309,12 @@ def send_mail_notification(self, event: str, user: Dojo_User, **kwargs: dict): try: subject = f"{self.system_settings.team_name} notification" - if "title" in kwargs: - subject += f": {kwargs['title']}" + if (title := kwargs.get("title")) is not None: + subject += f": {title}" email = EmailMessage( subject, - self._create_notification_message(event, user, "mail", **kwargs), + self._create_notification_message(event, user, "mail", kwargs), self.system_settings.email_from, [address], headers={"From": f"{self.system_settings.email_from}"}, @@ -330,6 +335,7 @@ def send_mail_notification(self, event: str, user: Dojo_User, **kwargs: dict): class WebhookNotificationManger(NotificationManagerHelpers): + """Manger for slack notifications and their helpers.""" ERROR_PERMANENT = "permanent" @@ -337,7 +343,7 @@ class WebhookNotificationManger(NotificationManagerHelpers): @dojo_async_task @app.task - def send_webhooks_notification(self, event: str, user: Dojo_User, **kwargs: dict): + def send_webhooks_notification(self, event: str, user: Dojo_User | None = None, **kwargs: dict): for endpoint in self._get_webhook_endpoints(user=user): error = None if endpoint.status not in [ @@ -386,7 +392,7 @@ def send_webhooks_notification(self, event: str, user: Dojo_User, **kwargs: dict endpoint.first_error = now endpoint.status = Notification_Webhooks.Status.STATUS_INACTIVE_TMP # In case of failure within one day, endpoint can be deactivated temporally only for one minute - self._webhook_reactivation.apply_async(kwargs={"endpoint_id": endpoint.pk}, countdown=60) + self._webhook_reactivation.apply_async(args=[self], kwargs={"endpoint_id": endpoint.pk}, countdown=60) # There is no reason to keep endpoint active if it is returning 4xx errors else: endpoint.status = Notification_Webhooks.Status.STATUS_INACTIVE_PERMANENT @@ -419,7 +425,7 @@ def _webhooks_notification_request( } if endpoint.header_name is not None: headers[endpoint.header_name] = endpoint.header_value - yaml_data = self._create_notification_message(event, endpoint.owner, "webhooks", **kwargs) + yaml_data = self._create_notification_message(event, endpoint.owner, "webhooks", kwargs) data = yaml.safe_load(yaml_data) return requests.request( @@ -437,7 +443,7 @@ def _test_webhooks_notification(self, endpoint: Notification_Webhooks) -> None: # for now, "raise_for_status" should be enough @app.task(ignore_result=True) - def _webhook_reactivation(self, endpoint_id: int, **kwargs): + def _webhook_reactivation(self, endpoint_id: int, **_kwargs: dict): endpoint = Notification_Webhooks.objects.get(pk=endpoint_id) # User already changed status of endpoint if endpoint.status != Notification_Webhooks.Status.STATUS_INACTIVE_TMP: @@ -450,12 +456,11 @@ def _webhook_reactivation(self, endpoint_id: int, **kwargs): class AlertNotificationManger(NotificationManagerHelpers): + """Manger for slack notifications and their helpers.""" - def send_alert_notification(self, event: str, user: Dojo_User, **kwargs): + def send_alert_notification(self, event: str, user: Dojo_User | None = None, **kwargs: dict): logger.debug("sending alert notification to %s", user) - logger.error(f"event: {event}") - logger.error(f"kwargs: {kwargs}") try: # no need to differentiate between user/no user icon = kwargs.get("icon", "info-circle") @@ -466,7 +471,7 @@ def send_alert_notification(self, event: str, user: Dojo_User, **kwargs): alert = Alerts( user_id=user, title=kwargs.get("title")[:250], - description=self._create_notification_message(event, user, "alert", **kwargs)[:2000], + description=self._create_notification_message(event, user, "alert", kwargs)[:2000], url=kwargs.get("url", reverse("alerts")), icon=icon[:25], source=source, @@ -491,26 +496,31 @@ class NotificationManager( EmailNotificationManger, WebhookNotificationManger, AlertNotificationManger, + NotificationManagerHelpers, ): + """Manage the construction and dispatch of notifications.""" + def __init__(self, *args: list, **kwargs: dict) -> None: + NotificationManagerHelpers.__init__(self, *args, **kwargs) + def create_notification(self, event: str | None = None, **kwargs: dict) -> None: # Process the notifications for a given list of recipients - if kwargs.get("recipients"): + if kwargs.get("recipients") is not None: self._process_recipients(event=event, **kwargs) else: logger.debug("creating system notifications for event: %s", event) # send system notifications to all admin users self._process_objects(**kwargs) # System notifications are sent one with user=None, which will trigger email to configured system email, to global slack channel, etc. - self._process_notifications(event=event, notifications=self.system_notifications, **kwargs) + self._process_notifications(event, notifications=self.system_notifications, **kwargs) # All admins will also receive system notifications, but as part of the person global notifications section below # This time user is set, so will trigger email to personal email, to personal slack channel (mention), etc. # only retrieve users which have at least one notification type enabled for this event type. logger.debug("creating personal notifications for event: %s", event) # There are notification like deleting a product type that shall not be sent to users. # These notifications will have the parameter no_users=True - if not ("no_users" in kwargs and kwargs["no_users"] is True): + if kwargs.get("no_users") is False: # get users with either global notifications, or a product specific notification # and all admin/superuser, they will always be notified for user in self._get_user_to_send_notifications_to(): @@ -532,35 +542,35 @@ def _process_recipients(self, event: str | None = None, **kwargs: dict) -> None: ) merged_notifications.user = recipient_notifications.user logger.debug("Sent notification to %s", merged_notifications.user) - self._process_notifications(event=event, notifications=merged_notifications, **kwargs) + self._process_notifications(event, notifications=merged_notifications, **kwargs) else: # Do not trump user preferences and send notifications as usual logger.debug("Sent notification to %s", recipient_notifications.user) - self._process_notifications(event=event, notifications=recipient_notifications, **kwargs) + self._process_notifications(event, notifications=recipient_notifications, **kwargs) def _process_objects(self, **kwargs: dict) -> None: """Extract the product and product type from the kwargs.""" self.product_type: Product_Type = None self.product: Product = None - if "product_type" in kwargs: - self.product_type = kwargs.get("product_type") + if (product_type := kwargs.get("product_type")) is not None: + self.product_type = product_type logger.debug("Defined product type %s", self.product_type) - if "product" in kwargs: - self.product = kwargs.get("product") + if (product := kwargs.get("product")) is not None: + self.product = product logger.debug("Defined product %s", self.product) - elif "engagement" in kwargs: - self.product = kwargs["engagement"].product + elif (engagement := kwargs.get("engagement")) is not None: + self.product = engagement.product logger.debug("Defined product of engagement %s", self.product) - elif "test" in kwargs: - self.product = kwargs["test"].engagement.product + elif (test := kwargs.get("test")) is not None: + self.product = test.engagement.product logger.debug("Defined product of test %s", self.product) - elif "finding" in kwargs: - self.product = kwargs["finding"].test.engagement.product + elif (finding := kwargs.get("finding")) is not None: + self.product = finding.test.engagement.product logger.debug("Defined product of finding %s", self.product) - elif "obj" in kwargs: + elif (obj := kwargs.get("obj")) is not None: from dojo.utils import get_product - self.product = get_product(kwargs["obj"]) + self.product = get_product(obj) logger.debug("Defined product of obj %s", self.product) def _get_user_to_send_notifications_to( @@ -609,11 +619,11 @@ def _send_single_notification_to_user(self, user: Dojo_User, event: str | None = notifications_set = Notifications.merge_notifications_list(applicable_notifications) notifications_set.user = user - self._process_notifications(event=event, notifications=notifications_set, **kwargs) + self._process_notifications(event, notifications=notifications_set, **kwargs) def _process_notifications( self, - event: str | None = None, + event: str | None, notifications: Notifications | None = None, **kwargs: dict, ) -> None: @@ -624,8 +634,6 @@ def _process_notifications( logger.debug("sending notification " + ("asynchronously" if we_want_async() else "synchronously")) logger.debug("process notifications for %s", notifications.user) - logger.error(f"event: {event}") - logger.error(f"kwargs: {kwargs}") if self.system_settings.enable_slack_notifications and "slack" in getattr( notifications, @@ -633,7 +641,7 @@ def _process_notifications( getattr(notifications, "other"), ): logger.debug("Sending Slack Notification") - SlackNotificationManger.send_slack_notification(self, event, notifications.user, **kwargs) + self.send_slack_notification(event, user=notifications.user, **kwargs) if self.system_settings.enable_msteams_notifications and "msteams" in getattr( notifications, @@ -641,7 +649,7 @@ def _process_notifications( getattr(notifications, "other"), ): logger.debug("Sending MSTeams Notification") - MSTeamsNotificationManger.send_msteams_notification(self, event, notifications.user, **kwargs) + self.send_msteams_notification(event, user=notifications.user, **kwargs) if self.system_settings.enable_mail_notifications and "mail" in getattr( notifications, @@ -649,7 +657,7 @@ def _process_notifications( getattr(notifications, "other"), ): logger.debug("Sending Mail Notification") - EmailNotificationManger.send_mail_notification(self, event, notifications.user, **kwargs) + self.send_mail_notification(event, user=notifications.user, **kwargs) if self.system_settings.enable_webhooks_notifications and "webhooks" in getattr( notifications, @@ -657,15 +665,15 @@ def _process_notifications( getattr(notifications, "other"), ): logger.debug("Sending Webhooks Notification") - WebhookNotificationManger.send_webhooks_notification(self, event, notifications.user, **kwargs) + self.send_webhooks_notification(event, user=notifications.user, **kwargs) if "alert" in getattr(notifications, event, getattr(notifications, "other")): logger.debug(f"Sending Alert to {notifications.user}") - self.send_alert_notification(event, notifications.user, **kwargs) + AlertNotificationManger.send_alert_notification(self, event, user=notifications.user, **kwargs) @app.task(ignore_result=True) -def webhook_status_cleanup(*args, **kwargs): +def webhook_status_cleanup(*_args: list, **_kwargs: dict): # If some endpoint was affected by some outage (5xx, 429, Timeout) but it was clean during last 24 hours, # we consider this endpoint as healthy so need to reset it endpoints = Notification_Webhooks.objects.filter( @@ -691,4 +699,4 @@ def webhook_status_cleanup(*args, **kwargs): ) for endpoint in broken_endpoints: manager = WebhookNotificationManger() - manager._webhook_reactivation(endpoint_id=endpoint.pk) + manager._webhook_reactivation(manager, endpoint_id=endpoint.pk) diff --git a/unittests/test_notifications.py b/unittests/test_notifications.py index c88e4c47f9d..1914a86f9c3 100644 --- a/unittests/test_notifications.py +++ b/unittests/test_notifications.py @@ -15,8 +15,8 @@ from dojo.models import ( DEFAULT_NOTIFICATION, Alerts, - Dojo_User, Development_Environment, + Dojo_User, Endpoint, Engagement, Finding, @@ -32,6 +32,7 @@ get_current_datetime, ) from dojo.notifications.helper import ( + AlertNotificationManger, NotificationManager, create_notification, webhook_status_cleanup, @@ -89,7 +90,7 @@ def test_merge_notifications_list(self): self.assertEqual(len(merged_notifications.other), 3) self.assertEqual(merged_notifications.other, {"alert", "mail", "slack"}) - @patch("dojo.notifications.helper.NotificationManager.send_alert_notification", wraps=NotificationManager.send_alert_notification) + @patch("dojo.notifications.helper.AlertNotificationManger.send_alert_notification", wraps=AlertNotificationManger.send_alert_notification) def test_notifications_system_level_trump(self, mock): notif_user, _ = Notifications.objects.get_or_create(user=User.objects.get(username="admin")) notif_system, _ = Notifications.objects.get_or_create(user=None, template=False) @@ -132,7 +133,7 @@ def test_notifications_system_level_trump(self, mock): self.assertEqual(mock.call_count, last_count + 1) last_count = mock.call_count - @patch("dojo.notifications.helper.NotificationManager.send_alert_notification", wraps=NotificationManager.send_alert_notification) + @patch("dojo.notifications.helper.AlertNotificationManger.send_alert_notification", wraps=AlertNotificationManger.send_alert_notification) def test_non_default_other_notifications(self, mock): notif_user, _ = Notifications.objects.get_or_create(user=User.objects.get(username="admin")) notif_system, _ = Notifications.objects.get_or_create(user=None, template=False) @@ -150,7 +151,7 @@ def test_non_default_other_notifications(self, mock): notif_user.save() create_notification(event="dummy_foo_event", title="title_for_dummy_foo_event", description="description_for_dummy_foo_event", recipients=["admin"]) self.assertEqual(mock.call_count, last_count + 1) - self.assertEqual(mock.call_args_list[0].args[0], "dummy_foo_event") + self.assertEqual(mock.call_args_list[0].args[1], "dummy_foo_event") alert = Alerts.objects.get(title="title_for_dummy_foo_event") self.assertEqual(alert.source, "Dummy Foo Event") @@ -530,7 +531,7 @@ def test_webhook_reactivation(self): with self.subTest("active"): wh = Notification_Webhooks.objects.filter(owner=None).first() manager = NotificationManager() - manager._webhook_reactivation(endpoint_id=wh.pk) + manager._webhook_reactivation(manager, endpoint_id=wh.pk) updated_wh = Notification_Webhooks.objects.filter(owner=None).first() self.assertEqual(updated_wh.status, Notification_Webhooks.Status.STATUS_ACTIVE) @@ -549,7 +550,7 @@ def test_webhook_reactivation(self): with self.assertLogs("dojo.notifications.helper", level="DEBUG") as cm: manager = NotificationManager() - manager._webhook_reactivation(endpoint_id=wh.pk) + manager._webhook_reactivation(manager, endpoint_id=wh.pk) updated_wh = Notification_Webhooks.objects.filter(owner=None).first() self.assertEqual(updated_wh.status, Notification_Webhooks.Status.STATUS_ACTIVE_TMP)