diff --git a/dojo/apps.py b/dojo/apps.py index aea61e29f83..e12ea7459be 100644 --- a/dojo/apps.py +++ b/dojo/apps.py @@ -72,8 +72,13 @@ def ready(self): # Load any signals here that will be ready for runtime # Importing the signals file is good enough if using the reciever decorator import dojo.announcement.signals # noqa: F401 + import dojo.endpoint.signals # noqa: F401 + import dojo.engagement.signals # noqa: F401 + import dojo.finding_group.signals # noqa: F401 import dojo.product.signals # noqa: F401 + import dojo.product_type.signals # noqa: F401 import dojo.sla_config.helpers # noqa: F401 + import dojo.tags_signals # noqa: F401 import dojo.test.signals # noqa: F401 diff --git a/dojo/endpoint/signals.py b/dojo/endpoint/signals.py new file mode 100644 index 00000000000..23ae7c4d07d --- /dev/null +++ b/dojo/endpoint/signals.py @@ -0,0 +1,30 @@ +from auditlog.models import LogEntry +from django.conf import settings +from django.contrib.contenttypes.models import ContentType +from django.db.models.signals import post_delete +from django.dispatch import receiver +from django.urls import reverse +from django.utils.translation import gettext as _ + +from dojo.models import Endpoint +from dojo.notifications.helper import create_notification + + +@receiver(post_delete, sender=Endpoint) +def endpoint_post_delete(sender, instance, using, origin, **kwargs): + if instance == origin: + if settings.ENABLE_AUDITLOG: + le = LogEntry.objects.get( + action=LogEntry.Action.DELETE, + content_type=ContentType.objects.get(app_label='dojo', model='endpoint'), + object_id=instance.id + ) + description = _('The endpoint "%(name)s" was deleted by %(user)s') % { + 'name': str(instance), 'user': le.actor} + else: + description = _('The endpoint "%(name)s" was deleted') % {'name': str(instance)} + create_notification(event='endpoint_deleted', # template does not exists, it will default to "other" but this event name needs to stay because of unit testing + title=_('Deletion of %(name)s') % {'name': str(instance)}, + description=description, + url=reverse('endpoint'), + icon="exclamation-triangle") diff --git a/dojo/endpoint/views.py b/dojo/endpoint/views.py index acb956bba1e..46a20980061 100644 --- a/dojo/endpoint/views.py +++ b/dojo/endpoint/views.py @@ -21,7 +21,6 @@ from dojo.filters import EndpointFilter, EndpointFilterWithoutObjectLookups from dojo.forms import AddEndpointForm, DeleteEndpointForm, DojoMetaDataForm, EditEndpointForm, ImportEndpointMetaForm from dojo.models import DojoMeta, Endpoint, Endpoint_Status, Finding, Product -from dojo.notifications.helper import create_notification from dojo.utils import ( Product_Tab, add_breadcrumb, @@ -222,12 +221,6 @@ def delete_endpoint(request, eid): messages.SUCCESS, 'Endpoint and relationships removed.', extra_tags='alert-success') - create_notification(event='other', - title=f'Deletion of {endpoint}', - product=product, - description=f'The endpoint "{endpoint}" was deleted by {request.user}', - url=reverse('endpoint'), - icon="exclamation-triangle") return HttpResponseRedirect(reverse('view_product', args=(product.id,))) collector = NestedObjects(using=DEFAULT_DB_ALIAS) diff --git a/dojo/engagement/signals.py b/dojo/engagement/signals.py new file mode 100644 index 00000000000..f8863ee8620 --- /dev/null +++ b/dojo/engagement/signals.py @@ -0,0 +1,57 @@ +from auditlog.models import LogEntry +from django.conf import settings +from django.contrib.contenttypes.models import ContentType +from django.db.models.signals import post_delete, post_save, pre_save +from django.dispatch import receiver +from django.urls import reverse +from django.utils.translation import gettext as _ + +from dojo.models import Engagement +from dojo.notifications.helper import create_notification + + +@receiver(post_save, sender=Engagement) +def engagement_post_save(sender, instance, created, **kwargs): + if created: + title = _('Engagement created for "%(product)s": %(name)s') % {'product': instance.product, 'name': instance.name} + create_notification(event='engagement_added', title=title, engagement=instance, product=instance.product, + url=reverse('view_engagement', args=(instance.id,))) + + +@receiver(pre_save, sender=Engagement) +def engagement_pre_save(sender, instance, **kwargs): + old = sender.objects.filter(pk=instance.pk).first() + if old and instance.status != old.status: + if instance.status in ["Cancelled", "Completed"]: + create_notification(event='engagement_closed', + title=_('Closure of %s') % instance.name, + description=_('The engagement "%s" was closed') % (instance.name), + engagement=instance, url=reverse('engagement_all_findings', args=(instance.id, ))) + elif instance.status in ["In Progress"] and old.status not in ["Not Started"]: + create_notification(event='engagement_reopened', + title=_('Reopening of %s') % instance.name, + engagement=instance, + description=_('The engagement "%s" was reopened') % (instance.name), + url=reverse('view_engagement', args=(instance.id, ))) + + +@receiver(post_delete, sender=Engagement) +def engagement_post_delete(sender, instance, using, origin, **kwargs): + if instance == origin: + if settings.ENABLE_AUDITLOG: + le = LogEntry.objects.get( + action=LogEntry.Action.DELETE, + content_type=ContentType.objects.get(app_label='dojo', model='engagement'), + object_id=instance.id + ) + description = _('The engagement "%(name)s" was deleted by %(user)s') % { + 'name': instance.name, 'user': le.actor} + else: + description = _('The engagement "%(name)s" was deleted') % {'name': instance.name} + create_notification(event='engagement_deleted', # template does not exists, it will default to "other" but this event name needs to stay because of unit testing + title=_('Deletion of %(name)s') % {'name': instance.name}, + description=description, + product=instance.product, + url=reverse('view_product', args=(instance.product.id, )), + recipients=[instance.lead], + icon="exclamation-triangle") diff --git a/dojo/engagement/views.py b/dojo/engagement/views.py index 5ac3340fcc3..9e045166fda 100644 --- a/dojo/engagement/views.py +++ b/dojo/engagement/views.py @@ -279,10 +279,6 @@ def edit_engagement(request, eid): engagement = form.save(commit=False) if (new_status == "Cancelled" or new_status == "Completed"): engagement.active = False - create_notification(event='close_engagement', - title=f'Closure of {engagement.name}', - description=f'The engagement "{engagement.name}" was closed', - engagement=engagement, url=reverse('engagement_all_findings', args=(engagement.id, ))), else: engagement.active = True engagement.save() @@ -361,14 +357,6 @@ def delete_engagement(request, eid): messages.SUCCESS, message, extra_tags='alert-success') - create_notification(event='other', - title=f'Deletion of {engagement.name}', - product=product, - description=f'The engagement "{engagement.name}" was deleted by {request.user}', - url=request.build_absolute_uri(reverse('view_engagements', args=(product.id, ))), - recipients=[engagement.lead], - icon="exclamation-triangle") - return HttpResponseRedirect(reverse("view_engagements", args=(product.id, ))) rels = ['Previewing the relationships has been disabled.', ''] @@ -404,8 +392,8 @@ def copy_engagement(request, eid): messages.SUCCESS, 'Engagement Copied successfully.', extra_tags='alert-success') - create_notification(event='other', - title=f'Copying of {engagement.name}', + create_notification(event='engagement_copied', # TODO - if 'copy' functionality will be supported by API as well, 'create_notification' needs to be migrated to place where it will be able to cover actions from both interfaces + title=_('Copying of %s') % engagement.name, description=f'The engagement "{engagement.name}" was copied by {request.user}', product=product, url=request.build_absolute_uri(reverse('view_engagement', args=(engagement_copy.id, ))), @@ -1138,10 +1126,6 @@ def close_eng(request, eid): messages.SUCCESS, 'Engagement closed successfully.', extra_tags='alert-success') - create_notification(event='close_engagement', - title=f'Closure of {eng.name}', - description=f'The engagement "{eng.name}" was closed', - engagement=eng, url=reverse('engagement_all_findings', args=(eng.id, ))), return HttpResponseRedirect(reverse("view_engagements", args=(eng.product.id, ))) @@ -1154,11 +1138,6 @@ def reopen_eng(request, eid): messages.SUCCESS, 'Engagement reopened successfully.', extra_tags='alert-success') - create_notification(event='other', - title=f'Reopening of {eng.name}', - engagement=eng, - description=f'The engagement "{eng.name}" was reopened', - url=reverse('view_engagement', args=(eng.id, ))), return HttpResponseRedirect(reverse("view_engagements", args=(eng.product.id, ))) diff --git a/dojo/finding/views.py b/dojo/finding/views.py index 44dcce6beab..d54baafb40c 100644 --- a/dojo/finding/views.py +++ b/dojo/finding/views.py @@ -22,6 +22,7 @@ from django.urls import reverse from django.utils import formats, timezone from django.utils.safestring import mark_safe +from django.utils.translation import gettext as _ from django.views import View from django.views.decorators.http import require_POST from imagekit import ImageSpec @@ -117,7 +118,7 @@ get_system_setting, get_words_for_field, match_finding_to_existing_findings, - process_notifications, + process_tag_notifications, redirect, redirect_to_return_url_or_else, reopen_external_issue, @@ -716,7 +717,7 @@ def process_form(self, request: HttpRequest, finding: Finding, context: dict): reverse("view_finding", args=(finding.id,)) ) title = f"Finding: {finding.title}" - process_notifications(request, new_note, url, title) + process_tag_notifications(request, new_note, url, title) # Add a message to the request messages.add_message( request, messages.SUCCESS, "Note saved.", extra_tags="alert-success" @@ -1169,9 +1170,14 @@ def process_form(self, request: HttpRequest, finding: Finding, context: dict): "Finding deleted successfully.", extra_tags="alert-success", ) + + # Note: this notification has not be moved to "@receiver(post_delete, sender=Finding)" method as many other notifications + # Because it could generate too much noise, we keep it here only for findings created by hand in WebUI + # TODO: but same should be implemented for API endpoint + # Send a notification that the finding had been deleted create_notification( - event="other", + event="finding_deleted", title=f"Deletion of {finding.title}", description=f'The finding "{finding.title}" was deleted by {request.user}', product=product, @@ -1288,9 +1294,14 @@ def close_finding(request, fid): "Finding closed.", extra_tags="alert-success", ) + + # Note: this notification has not be moved to "@receiver(pre_save, sender=Finding)" method as many other notifications + # Because it could generate too much noise, we keep it here only for findings created by hand in WebUI + # TODO: but same should be implemented for API endpoint + create_notification( - event="other", - title=f"Closing of {finding.title}", + event="finding_closed", + title=_("Closing of %s") % finding.title, finding=finding, description=f'The finding "{finding.title}" was closed by {request.user}', url=reverse("view_finding", args=(finding.id,)), @@ -1451,9 +1462,14 @@ def reopen_finding(request, fid): messages.add_message( request, messages.SUCCESS, "Finding Reopened.", extra_tags="alert-success" ) + + # Note: this notification has not be moved to "@receiver(pre_save, sender=Finding)" method as many other notifications + # Because it could generate too much noise, we keep it here only for findings created by hand in WebUI + # TODO: but same should be implemented for API endpoint + create_notification( - event="other", - title=f"Reopening of {finding.title}", + event="finding_reopened", + title=_("Reopening of %s") % finding.title, finding=finding, description=f'The finding "{finding.title}" was reopened by {request.user}', url=reverse("view_finding", args=(finding.id,)), @@ -1510,8 +1526,8 @@ def copy_finding(request, fid): extra_tags="alert-success", ) create_notification( - event="other", - title=f"Copying of {finding.title}", + event="finding_copied", # TODO - if 'copy' functionality will be supported by API as well, 'create_notification' needs to be migrated to place where it will be able to cover actions from both interfaces + title=_("Copying of %s") % finding.title, description=f'The finding "{finding.title}" was copied by {request.user} to {test.title}', product=product, url=request.build_absolute_uri( @@ -1686,7 +1702,7 @@ def request_finding_review(request, fid): logger.debug(f"Asking {reviewers_string} for review") create_notification( - event="review_requested", + event="review_requested", # TODO - if 'review_requested' functionality will be supported by API as well, 'create_notification' needs to be migrated to place where it will be able to cover actions from both interfaces title="Finding review requested", requested_by=user, note=new_note, diff --git a/dojo/finding_group/signals.py b/dojo/finding_group/signals.py new file mode 100644 index 00000000000..1dc0e339e90 --- /dev/null +++ b/dojo/finding_group/signals.py @@ -0,0 +1,31 @@ +from auditlog.models import LogEntry +from django.conf import settings +from django.contrib.contenttypes.models import ContentType +from django.db.models.signals import post_delete +from django.dispatch import receiver +from django.urls import reverse +from django.utils.translation import gettext as _ + +from dojo.models import Finding_Group +from dojo.notifications.helper import create_notification + + +@receiver(post_delete, sender=Finding_Group) +def finding_group_post_delete(sender, instance, using, origin, **kwargs): + if instance == origin: + if settings.ENABLE_AUDITLOG: + le = LogEntry.objects.get( + action=LogEntry.Action.DELETE, + content_type=ContentType.objects.get(app_label='dojo', model='finding_group'), + object_id=instance.id + ) + description = _('The finding group "%(name)s" was deleted by %(user)s') % { + 'name': instance.name, 'user': le.actor} + else: + description = _('The finding group "%(name)s" was deleted') % {'name': instance.name} + create_notification(event='finding_group_deleted', # template does not exists, it will default to "other" but this event name needs to stay because of unit testing + title=_('Deletion of %(name)s') % {'name': instance.name}, + description=description, + product=instance.test.engagement.product, + url=reverse('view_test', args=(instance.test.id, )), + icon="exclamation-triangle") diff --git a/dojo/finding_group/views.py b/dojo/finding_group/views.py index 91737456952..b22c75d0e70 100644 --- a/dojo/finding_group/views.py +++ b/dojo/finding_group/views.py @@ -16,7 +16,6 @@ from dojo.finding.views import prefetch_for_findings from dojo.forms import DeleteFindingGroupForm, EditFindingGroupForm, FindingBulkUpdateForm from dojo.models import Engagement, Finding, Finding_Group, GITHUB_PKey, Product -from dojo.notifications.helper import create_notification from dojo.utils import Product_Tab, add_breadcrumb, get_page_items, get_system_setting, get_words_for_field logger = logging.getLogger(__name__) @@ -115,19 +114,11 @@ def delete_finding_group(request, fgid): if 'id' in request.POST and str(finding_group.id) == request.POST['id']: form = DeleteFindingGroupForm(request.POST, instance=finding_group) if form.is_valid(): - product = finding_group.test.engagement.product finding_group.delete() messages.add_message(request, messages.SUCCESS, 'Finding Group and relationships removed.', extra_tags='alert-success') - - create_notification(event='other', - title=f'Deletion of {finding_group.name}', - product=product, - description=f'The finding group "{finding_group.name}" was deleted by {request.user}', - url=request.build_absolute_uri(reverse('view_test', args=(finding_group.test.id,))), - icon="exclamation-triangle") return HttpResponseRedirect(reverse('view_test', args=(finding_group.test.id,))) collector = NestedObjects(using=DEFAULT_DB_ALIAS) diff --git a/dojo/jira_link/views.py b/dojo/jira_link/views.py index 64abdf50eab..4e2a033305f 100644 --- a/dojo/jira_link/views.py +++ b/dojo/jira_link/views.py @@ -239,10 +239,10 @@ def check_for_and_create_comment(parsed_json): findings = None if jissue.finding: findings = [jissue.finding] - create_notification(event='other', title=f'JIRA incoming comment - {jissue.finding}', finding=jissue.finding, url=reverse("view_finding", args=(jissue.finding.id,)), icon='check') + create_notification(event='jira_comment', title=f'JIRA incoming comment - {jissue.finding}', finding=jissue.finding, url=reverse("view_finding", args=(jissue.finding.id,)), icon='check') elif jissue.finding_group: findings = [jissue.finding_group.findings.all()] - create_notification(event='other', title=f'JIRA incoming comment - {jissue.finding}', finding=jissue.finding, url=reverse("view_finding_group", args=(jissue.finding_group.id,)), icon='check') + create_notification(event='jira_comment', title=f'JIRA incoming comment - {jissue.finding}', finding=jissue.finding, url=reverse("view_finding_group", args=(jissue.finding_group.id,)), icon='check') elif jissue.engagement: return webhook_responser_handler("debug", "Comment for engagement ignored") else: @@ -378,7 +378,7 @@ def post(self, request): 'JIRA Configuration Successfully Created.', extra_tags='alert-success') create_notification( - event='other', + event='jira_config_added', title=f"New addition of JIRA: {jform.cleaned_data.get('configuration_name')}", description=f"JIRA \"{jform.cleaned_data.get('configuration_name')}\" was added by {request.user}", url=request.build_absolute_uri(reverse('jira'))) @@ -423,7 +423,7 @@ def post(self, request): 'JIRA Configuration Successfully Created.', extra_tags='alert-success') create_notification( - event='other', + event='jira_config_added', title=f"New addition of JIRA: {jform.cleaned_data.get('configuration_name')}", description=f"JIRA \"{jform.cleaned_data.get('configuration_name')}\" was added by {request.user}", url=request.build_absolute_uri(reverse('jira'))) @@ -478,7 +478,7 @@ def post(self, request, jid=None): 'JIRA Configuration Successfully Saved.', extra_tags='alert-success') create_notification( - event='other', + event='jira_config_edited', title=f"Edit of JIRA: {jform.cleaned_data.get('configuration_name')}", description=f"JIRA \"{jform.cleaned_data.get('configuration_name')}\" was edited by {request.user}", url=request.build_absolute_uri(reverse('jira'))) @@ -540,8 +540,8 @@ def post(self, request, tid=None): 'JIRA Conf and relationships removed.', extra_tags='alert-success') create_notification( - event='other', - title=f'Deletion of JIRA: {jira_instance.configuration_name}', + event='jira_config_deleted', + title=_('Deletion of JIRA: %s') % jira_instance.configuration_name, description=f"JIRA \"{jira_instance.configuration_name}\" was deleted by {request.user}", url=request.build_absolute_uri(reverse('jira'))) return HttpResponseRedirect(reverse('jira')) diff --git a/dojo/middleware.py b/dojo/middleware.py index 59a61eba81a..3b8e641646c 100644 --- a/dojo/middleware.py +++ b/dojo/middleware.py @@ -3,10 +3,13 @@ from threading import local from urllib.parse import quote +from auditlog.context import set_actor +from auditlog.middleware import AuditlogMiddleware as _AuditlogMiddleware from django.conf import settings from django.db import models from django.http import HttpResponseRedirect from django.urls import reverse +from django.utils.functional import SimpleLazyObject logger = logging.getLogger(__name__) @@ -164,3 +167,17 @@ def __init__(self, get_response): def __call__(self, request): request.META.update(settings.ADDITIONAL_HEADERS) return self.get_response(request) + + +# This solution comes from https://github.com/jazzband/django-auditlog/issues/115#issuecomment-1539262735 +# It fix situation when TokenAuthentication is used in API. Otherwise, actor in AuditLog would be set to None +class AuditlogMiddleware(_AuditlogMiddleware): + def __call__(self, request): + remote_addr = self._get_remote_addr(request) + + user = SimpleLazyObject(lambda: getattr(request, "user", None)) + + context = set_actor(actor=user, remote_addr=remote_addr) + + with context: + return self.get_response(request) diff --git a/dojo/models.py b/dojo/models.py index 53149c8389e..415bc6b4567 100644 --- a/dojo/models.py +++ b/dojo/models.py @@ -4561,6 +4561,7 @@ def __str__(self): auditlog.register(Endpoint) auditlog.register(Engagement) auditlog.register(Finding) + auditlog.register(Finding_Group) auditlog.register(Product_Type) auditlog.register(Product) auditlog.register(Test) diff --git a/dojo/notifications/helper.py b/dojo/notifications/helper.py index 4f4d16b8a12..9126292e233 100644 --- a/dojo/notifications/helper.py +++ b/dojo/notifications/helper.py @@ -2,6 +2,7 @@ import requests from django.conf import settings +from django.core.exceptions import FieldDoesNotExist from django.core.mail import EmailMessage from django.db.models import Count, Prefetch, Q from django.template import TemplateDoesNotExist @@ -170,20 +171,20 @@ def process_notifications(event, notifications=None, **kwargs): msteams_enabled = get_system_setting('enable_msteams_notifications') mail_enabled = get_system_setting('enable_mail_notifications') - if slack_enabled and 'slack' in getattr(notifications, event): + if slack_enabled and 'slack' in getattr(notifications, event, getattr(notifications, 'other')): logger.debug('Sending Slack Notification') send_slack_notification(event, notifications.user, **kwargs) - if msteams_enabled and 'msteams' in getattr(notifications, event): + if msteams_enabled and 'msteams' in getattr(notifications, event, getattr(notifications, 'other')): logger.debug('Sending MSTeams Notification') send_msteams_notification(event, notifications.user, **kwargs) - if mail_enabled and 'mail' in getattr(notifications, event): + if mail_enabled and 'mail' in getattr(notifications, event, getattr(notifications, 'other')): logger.debug('Sending Mail Notification') send_mail_notification(event, notifications.user, **kwargs) - if 'alert' in getattr(notifications, event, None): - logger.debug('Sending Alert') + if 'alert' in getattr(notifications, event, getattr(notifications, 'other')): + logger.debug(f'Sending Alert to {notifications.user}') send_alert_notification(event, notifications.user, **kwargs) @@ -314,13 +315,17 @@ def send_alert_notification(event, user=None, *args, **kwargs): try: # no need to differentiate between user/no user icon = kwargs.get('icon', 'info-circle') + try: + source = Notifications._meta.get_field(event).verbose_name.title()[:100] + except FieldDoesNotExist: + source = event.replace("_", " ").title()[:100] alert = Alerts( user_id=user, title=kwargs.get('title')[:250], description=create_notification_message(event, user, 'alert', *args, **kwargs)[:2000], url=kwargs.get('url', reverse('alerts')), icon=icon[:25], - source=Notifications._meta.get_field(event).verbose_name.title()[:100] + source=source, ) # relative urls will fail validation alert.clean_fields(exclude=['url']) diff --git a/dojo/product/signals.py b/dojo/product/signals.py index 10869a5f282..4ae3053b5f2 100644 --- a/dojo/product/signals.py +++ b/dojo/product/signals.py @@ -1,79 +1,38 @@ -import contextlib -import logging - -from django.db.models import signals +from auditlog.models import LogEntry +from django.conf import settings +from django.contrib.contenttypes.models import ContentType +from django.db.models.signals import post_delete, post_save from django.dispatch import receiver - -from dojo.models import Endpoint, Engagement, Finding, Product, Test -from dojo.product import helpers as async_product_funcs -from dojo.utils import get_system_setting - -logger = logging.getLogger(__name__) - - -@receiver(signals.m2m_changed, sender=Product.tags.through) -def product_tags_post_add_remove(sender, instance, action, **kwargs): - if action in ["post_add", "post_remove"]: - running_async_process = False - with contextlib.suppress(AttributeError): - running_async_process = instance.running_async_process - # Check if the async process is already running to avoid calling it a second time - if not running_async_process and inherit_product_tags(instance): - async_product_funcs.propagate_tags_on_product(instance.id, countdown=5) - instance.running_async_process = True - - -@receiver(signals.m2m_changed, sender=Endpoint.tags.through) -@receiver(signals.m2m_changed, sender=Engagement.tags.through) -@receiver(signals.m2m_changed, sender=Test.tags.through) -@receiver(signals.m2m_changed, sender=Finding.tags.through) -def make_inherited_tags_sticky(sender, instance, action, **kwargs): - if action in ["post_add", "post_remove"]: - if inherit_product_tags(instance): - tag_list = [tag.name for tag in instance.tags.all()] - if propagate_inheritance(instance, tag_list=tag_list): - instance.inherit_tags(tag_list) - - -@receiver(signals.post_save, sender=Endpoint) -@receiver(signals.post_save, sender=Engagement) -@receiver(signals.post_save, sender=Test) -@receiver(signals.post_save, sender=Finding) -def inherit_tags_on_instance(sender, instance, created, **kwargs): - if inherit_product_tags(instance): - tag_list = instance._tags_tagulous.get_tag_list() - if propagate_inheritance(instance, tag_list=tag_list): - instance.inherit_tags(tag_list) - - -def propagate_inheritance(instance, tag_list=[]): - # Get the expected product tags - product_inherited_tags = [tag.name for tag in get_product(instance).tags.all()] - existing_inherited_tags = [tag.name for tag in instance.inherited_tags.all()] - # Check if product tags already matches inherited tags - product_tags_equals_inherited_tags = product_inherited_tags == existing_inherited_tags - # Check if product tags have already been inherited - tags_have_already_been_inherited = set(product_inherited_tags) <= set(tag_list) - return not (product_tags_equals_inherited_tags and tags_have_already_been_inherited) - - -def inherit_product_tags(instance) -> bool: - product = get_product(instance) - # Save a read in the db - if product and product.enable_product_tag_inheritance: - return True - - return get_system_setting('enable_product_tag_inheritance') - - -def get_product(instance): - if isinstance(instance, Product): - return instance - if isinstance(instance, Endpoint): - return instance.product - if isinstance(instance, Engagement): - return instance.product - if isinstance(instance, Test): - return instance.engagement.product - if isinstance(instance, Finding): - return instance.test.engagement.product +from django.urls import reverse +from django.utils.translation import gettext as _ + +from dojo.models import Product +from dojo.notifications.helper import create_notification + + +@receiver(post_save, sender=Product) +def product_post_save(sender, instance, created, **kwargs): + if created: + create_notification(event='product_added', + title=instance.name, + product=instance, + url=reverse('view_product', args=(instance.id,))) + + +@receiver(post_delete, sender=Product) +def product_post_delete(sender, instance, **kwargs): + if settings.ENABLE_AUDITLOG: + le = LogEntry.objects.get( + action=LogEntry.Action.DELETE, + content_type=ContentType.objects.get(app_label='dojo', model='product'), + object_id=instance.id + ) + description = _('The product "%(name)s" was deleted by %(user)s') % { + 'name': instance.name, 'user': le.actor} + else: + description = _('The product "%(name)s" was deleted') % {'name': instance.name} + create_notification(event='product_deleted', # template does not exists, it will default to "other" but this event name needs to stay because of unit testing + title=_('Deletion of %(name)s') % {'name': instance.name}, + description=description, + url=reverse('product'), + icon="exclamation-triangle") diff --git a/dojo/product/views.py b/dojo/product/views.py index e98d471ad62..580bb2c6442 100644 --- a/dojo/product/views.py +++ b/dojo/product/views.py @@ -91,7 +91,6 @@ Test, Test_Type, ) -from dojo.notifications.helper import create_notification from dojo.product.queries import ( get_authorized_groups_for_product, get_authorized_members_for_product, @@ -903,10 +902,6 @@ def new_product(request, ptid=None): except: logger.info('Labels cannot be created - they may already exists') - create_notification(event='product_added', title=product.name, - product=product, - url=reverse('view_product', args=(product.id,))) - if not error: return HttpResponseRedirect(reverse('view_product', args=(product.id,))) else: @@ -1022,7 +1017,6 @@ def delete_product(request, pid): if 'id' in request.POST and str(product.id) == request.POST['id']: form = DeleteProductForm(request.POST, instance=product) if form.is_valid(): - product_type = product.prod_type if get_setting("ASYNC_OBJECT_DELETE"): async_del = async_delete() async_del.delete(product) @@ -1034,13 +1028,6 @@ def delete_product(request, pid): messages.SUCCESS, message, extra_tags='alert-success') - create_notification(event='other', - title=_('Deletion of %(name)s') % {'name': product.name}, - product_type=product_type, - description=_('The product "%(name)s" was deleted by %(user)s') % { - 'name': product.name, 'user': request.user}, - url=reverse('product'), - icon="exclamation-triangle") logger.debug('delete_product: POST RETURN') return HttpResponseRedirect(reverse('product')) else: diff --git a/dojo/product_type/signals.py b/dojo/product_type/signals.py new file mode 100644 index 00000000000..65a06c1284c --- /dev/null +++ b/dojo/product_type/signals.py @@ -0,0 +1,39 @@ +from auditlog.models import LogEntry +from django.conf import settings +from django.contrib.contenttypes.models import ContentType +from django.db.models.signals import post_delete, post_save +from django.dispatch import receiver +from django.urls import reverse +from django.utils.translation import gettext as _ + +from dojo.models import Product_Type +from dojo.notifications.helper import create_notification + + +@receiver(post_save, sender=Product_Type) +def product_type_post_save(sender, instance, created, **kwargs): + if created: + create_notification(event='product_type_added', + title=instance.name, + product_type=instance, + url=reverse('view_product_type', args=(instance.id,))) + + +@receiver(post_delete, sender=Product_Type) +def product_type_post_delete(sender, instance, **kwargs): + if settings.ENABLE_AUDITLOG: + le = LogEntry.objects.get( + action=LogEntry.Action.DELETE, + content_type=ContentType.objects.get(app_label='dojo', model='product_type'), + object_id=instance.id + ) + description = _('The product type "%(name)s" was deleted by %(user)s') % { + 'name': instance.name, 'user': le.actor} + else: + description = _('The product type "%(name)s" was deleted') % {'name': instance.name} + create_notification(event='product_type_deleted', # template does not exists, it will default to "other" but this event name needs to stay because of unit testing + title=_('Deletion of %(name)s') % {'name': instance.name}, + description=description, + no_users=True, + url=reverse('product_type'), + icon="exclamation-triangle") diff --git a/dojo/product_type/views.py b/dojo/product_type/views.py index bb41a1cd63b..efa46f73a8d 100644 --- a/dojo/product_type/views.py +++ b/dojo/product_type/views.py @@ -25,14 +25,19 @@ Product_TypeForm, ) from dojo.models import Product_Type, Product_Type_Group, Product_Type_Member, Role -from dojo.notifications.helper import create_notification from dojo.product.queries import get_authorized_products from dojo.product_type.queries import ( get_authorized_groups_for_product_type, get_authorized_members_for_product_type, get_authorized_product_types, ) -from dojo.utils import add_breadcrumb, async_delete, get_page_items, get_setting, is_title_in_breadcrumbs +from dojo.utils import ( + add_breadcrumb, + async_delete, + get_page_items, + get_setting, + is_title_in_breadcrumbs, +) logger = logging.getLogger(__name__) @@ -98,9 +103,6 @@ def add_product_type(request): messages.SUCCESS, _('Product type added successfully.'), extra_tags='alert-success') - create_notification(event='product_type_added', title=product_type.name, - product_type=product_type, - url=reverse('view_product_type', args=(product_type.id,))) return HttpResponseRedirect(reverse('product_type')) add_breadcrumb(title=page_name, top_level=False, request=request) @@ -147,12 +149,6 @@ def delete_product_type(request, ptid): messages.SUCCESS, message, extra_tags='alert-success') - create_notification(event='other', - title=f'Deletion of {product_type.name}', - no_users=True, - description=f'The product type "{product_type.name}" was deleted by {request.user}', - url=request.build_absolute_uri(reverse('product_type')), - icon="exclamation-triangle") return HttpResponseRedirect(reverse('product_type')) rels = [_('Previewing the relationships has been disabled.'), ''] diff --git a/dojo/settings/settings.dist.py b/dojo/settings/settings.dist.py index 21130b00ac5..e1b4ad12ad5 100644 --- a/dojo/settings/settings.dist.py +++ b/dojo/settings/settings.dist.py @@ -857,7 +857,7 @@ def generate_url(scheme, double_slashes, user, password, host, port, path, param 'dojo.middleware.AdditionalHeaderMiddleware', 'social_django.middleware.SocialAuthExceptionMiddleware', 'watson.middleware.SearchContextMiddleware', - 'auditlog.middleware.AuditlogMiddleware', + 'dojo.middleware.AuditlogMiddleware', 'crum.CurrentRequestUserMiddleware', 'dojo.request_cache.middleware.RequestCacheMiddleware', ] diff --git a/dojo/tags_signals.py b/dojo/tags_signals.py new file mode 100644 index 00000000000..10869a5f282 --- /dev/null +++ b/dojo/tags_signals.py @@ -0,0 +1,79 @@ +import contextlib +import logging + +from django.db.models import signals +from django.dispatch import receiver + +from dojo.models import Endpoint, Engagement, Finding, Product, Test +from dojo.product import helpers as async_product_funcs +from dojo.utils import get_system_setting + +logger = logging.getLogger(__name__) + + +@receiver(signals.m2m_changed, sender=Product.tags.through) +def product_tags_post_add_remove(sender, instance, action, **kwargs): + if action in ["post_add", "post_remove"]: + running_async_process = False + with contextlib.suppress(AttributeError): + running_async_process = instance.running_async_process + # Check if the async process is already running to avoid calling it a second time + if not running_async_process and inherit_product_tags(instance): + async_product_funcs.propagate_tags_on_product(instance.id, countdown=5) + instance.running_async_process = True + + +@receiver(signals.m2m_changed, sender=Endpoint.tags.through) +@receiver(signals.m2m_changed, sender=Engagement.tags.through) +@receiver(signals.m2m_changed, sender=Test.tags.through) +@receiver(signals.m2m_changed, sender=Finding.tags.through) +def make_inherited_tags_sticky(sender, instance, action, **kwargs): + if action in ["post_add", "post_remove"]: + if inherit_product_tags(instance): + tag_list = [tag.name for tag in instance.tags.all()] + if propagate_inheritance(instance, tag_list=tag_list): + instance.inherit_tags(tag_list) + + +@receiver(signals.post_save, sender=Endpoint) +@receiver(signals.post_save, sender=Engagement) +@receiver(signals.post_save, sender=Test) +@receiver(signals.post_save, sender=Finding) +def inherit_tags_on_instance(sender, instance, created, **kwargs): + if inherit_product_tags(instance): + tag_list = instance._tags_tagulous.get_tag_list() + if propagate_inheritance(instance, tag_list=tag_list): + instance.inherit_tags(tag_list) + + +def propagate_inheritance(instance, tag_list=[]): + # Get the expected product tags + product_inherited_tags = [tag.name for tag in get_product(instance).tags.all()] + existing_inherited_tags = [tag.name for tag in instance.inherited_tags.all()] + # Check if product tags already matches inherited tags + product_tags_equals_inherited_tags = product_inherited_tags == existing_inherited_tags + # Check if product tags have already been inherited + tags_have_already_been_inherited = set(product_inherited_tags) <= set(tag_list) + return not (product_tags_equals_inherited_tags and tags_have_already_been_inherited) + + +def inherit_product_tags(instance) -> bool: + product = get_product(instance) + # Save a read in the db + if product and product.enable_product_tag_inheritance: + return True + + return get_system_setting('enable_product_tag_inheritance') + + +def get_product(instance): + if isinstance(instance, Product): + return instance + if isinstance(instance, Endpoint): + return instance.product + if isinstance(instance, Engagement): + return instance.product + if isinstance(instance, Test): + return instance.engagement.product + if isinstance(instance, Finding): + return instance.test.engagement.product diff --git a/dojo/test/signals.py b/dojo/test/signals.py index 6cfb95a4a3f..47d4fdffb8e 100644 --- a/dojo/test/signals.py +++ b/dojo/test/signals.py @@ -1,15 +1,40 @@ import contextlib -import logging -from django.db.models import signals +from auditlog.models import LogEntry +from django.conf import settings +from django.contrib.contenttypes.models import ContentType +from django.db.models.signals import post_delete, pre_save from django.dispatch import receiver +from django.urls import reverse +from django.utils.translation import gettext as _ from dojo.models import Finding, Test +from dojo.notifications.helper import create_notification -logger = logging.getLogger(__name__) +@receiver(post_delete, sender=Test) +def test_post_delete(sender, instance, using, origin, **kwargs): + if instance == origin: + if settings.ENABLE_AUDITLOG: + le = LogEntry.objects.get( + action=LogEntry.Action.DELETE, + content_type=ContentType.objects.get(app_label='dojo', model='test'), + object_id=instance.id + ) + description = _('The test "%(name)s" was deleted by %(user)s') % { + 'name': str(instance), 'user': le.actor} + else: + description = _('The test "%(name)s" was deleted') % {'name': str(instance)} + create_notification(event='test_deleted', # template does not exists, it will default to "other" but this event name needs to stay because of unit testing + title=_('Deletion of %(name)s') % {'name': str(instance)}, + description=description, + product=instance.engagement.product, + url=reverse('view_engagement', args=(instance.engagement.id, )), + recipients=[instance.engagement.lead], + icon="exclamation-triangle") -@receiver(signals.pre_save, sender=Test) + +@receiver(pre_save, sender=Test) def update_found_by_for_findings(sender, instance, **kwargs): with contextlib.suppress(sender.DoesNotExist): obj = sender.objects.get(pk=instance.pk) diff --git a/dojo/test/views.py b/dojo/test/views.py index 7eee27829e6..43c649503d4 100644 --- a/dojo/test/views.py +++ b/dojo/test/views.py @@ -75,7 +75,7 @@ get_setting, get_system_setting, get_words_for_field, - process_notifications, + process_tag_notifications, redirect_to_return_url_or_else, ) @@ -230,7 +230,7 @@ def process_form(self, request: HttpRequest, test: Test, context: dict): # Make a notification for this actions url = request.build_absolute_uri(reverse("view_test", args=(test.id,))) title = f"Test: {test.test_type.name} on {test.engagement.product.name}" - process_notifications(request, new_note, url, title) + process_tag_notifications(request, new_note, url, title) messages.add_message( request, messages.SUCCESS, @@ -322,7 +322,6 @@ def delete_test(request, tid): if 'id' in request.POST and str(test.id) == request.POST['id']: form = DeleteTestForm(request.POST, instance=test) if form.is_valid(): - product = test.engagement.product if get_setting("ASYNC_OBJECT_DELETE"): async_del = async_delete() async_del.delete(test) @@ -334,13 +333,6 @@ def delete_test(request, tid): messages.SUCCESS, message, extra_tags='alert-success') - create_notification(event='other', - title=_(f"Deletion of {test.title}"), - product=product, - description=_(f'The test "{test.title}" was deleted by {request.user}'), - url=request.build_absolute_uri(reverse('view_engagement', args=(eng.id, ))), - recipients=[test.engagement.lead], - icon="exclamation-triangle") return HttpResponseRedirect(reverse('view_engagement', args=(eng.id,))) rels = ['Previewing the relationships has been disabled.', ''] @@ -380,7 +372,7 @@ def copy_test(request, tid): messages.SUCCESS, 'Test Copied successfully.', extra_tags='alert-success') - create_notification(event='other', + create_notification(event='test_copied', # TODO - if 'copy' functionality will be supported by API as well, 'create_notification' needs to be migrated to place where it will be able to cover actions from both interfaces title=f'Copying of {test.title}', description=f'The test "{test.title}" was copied by {request.user} to {engagement.name}', product=product, @@ -624,9 +616,14 @@ def process_forms(self, request: HttpRequest, test: Test, context: dict): ) burp_rr.clean() burp_rr.save() + + # Note: this notification has not be moved to "@receiver(post_save, sender=Finding)" method as many other notifications + # Because it could generate too much noise, we keep it here only for findings created by hand in WebUI + # TODO: but same should be implemented for API endpoint + # Create a notification create_notification( - event='other', + event='finding_added', title=_(f'Addition of {finding.title}'), finding=finding, description=_(f'Finding "{finding.title}" was added by {request.user}'), diff --git a/dojo/tools/api_sonarqube/importer.py b/dojo/tools/api_sonarqube/importer.py index 11406eb5a5a..79794e3a569 100644 --- a/dojo/tools/api_sonarqube/importer.py +++ b/dojo/tools/api_sonarqube/importer.py @@ -207,7 +207,7 @@ def import_issues(self, test): except Exception as e: logger.exception(e) create_notification( - event="other", + event="sonarqube_failed", title="SonarQube API import issue", description=e, icon="exclamation-triangle", @@ -330,7 +330,7 @@ def import_hotspots(self, test): except Exception as e: logger.exception(e) create_notification( - event="other", + event="sonarqube_failed", title="SonarQube API import issue", description=e, icon="exclamation-triangle", diff --git a/dojo/utils.py b/dojo/utils.py index f5605b9dbab..daaa545b010 100644 --- a/dojo/utils.py +++ b/dojo/utils.py @@ -1381,7 +1381,7 @@ def reopen_external_issue(find, note, external_issue_provider, **kwargs): reopen_external_issue_github(find, note, prod, eng) -def process_notifications(request, note, parent_url, parent_title): +def process_tag_notifications(request, note, parent_url, parent_title): regex = re.compile(r'(?:\A|\s)@(\w+)\b') usernames_to_check = set(un.lower() for un in regex.findall(note.entry)) # noqa: C401 @@ -1763,15 +1763,6 @@ def user_post_save(sender, instance, created, **kwargs): instance.save() -@receiver(post_save, sender=Engagement) -def engagement_post_Save(sender, instance, created, **kwargs): - if created: - engagement = instance - title = 'Engagement created for ' + str(engagement.product) + ': ' + str(engagement.name) - create_notification(event='engagement_added', title=title, engagement=engagement, product=engagement.product, - url=reverse('view_engagement', args=(engagement.id,))) - - def is_safe_url(url): try: # available in django 3+ diff --git a/unittests/test_notifications.py b/unittests/test_notifications.py index cad7106cfb3..941b648753d 100644 --- a/unittests/test_notifications.py +++ b/unittests/test_notifications.py @@ -1,6 +1,26 @@ from unittest.mock import patch -from dojo.models import DEFAULT_NOTIFICATION, Notifications, Product, User +from auditlog.context import set_actor +from django.test import override_settings +from django.urls import reverse +from django.utils import timezone +from rest_framework.authtoken.models import Token +from rest_framework.test import APIClient, APITestCase + +from dojo.models import ( + DEFAULT_NOTIFICATION, + Alerts, + Dojo_User, + Endpoint, + Engagement, + Finding_Group, + Notifications, + Product, + Product_Type, + Test, + Test_Type, + User, +) from dojo.notifications.helper import create_notification, send_alert_notification from .dojo_test_case import DojoTestCase @@ -65,7 +85,7 @@ def test_notifications_system_level_trump(self, mock): notif_user, _ = Notifications.objects.get_or_create(user=User.objects.get(username='admin')) notif_system, _ = Notifications.objects.get_or_create(user=None, template=False) - last_count = 0 + last_count = mock.call_count with self.subTest('user off, system off'): notif_user.user_mentioned = () # no alert notif_user.save() @@ -102,3 +122,274 @@ def test_notifications_system_level_trump(self, mock): create_notification(event="user_mentioned", title="user_mentioned", recipients=['admin']) self.assertEqual(mock.call_count, last_count + 1) last_count = mock.call_count + + @patch('dojo.notifications.helper.send_alert_notification', wraps=send_alert_notification) + def test_non_default_other_notifications(self, mock): + notif_user, _ = Notifications.objects.get_or_create(user=User.objects.get(username='admin')) + notif_system, _ = Notifications.objects.get_or_create(user=None, template=False) + + last_count = mock.call_count + with self.subTest('do not notify other'): + notif_user.other = () # no alert + notif_user.save() + create_notification(event="dummy_bar_event", recipients=['admin']) + self.assertEqual(mock.call_count, last_count) + + last_count = mock.call_count + with self.subTest('notify other'): + notif_user.other = DEFAULT_NOTIFICATION # alert only + notif_user.save() + create_notification(event="dummy_foo_event", title="title_for_dummy_foo_event", description="description_for_dummy_foo_event", recipients=['admin']) + self.assertEqual(mock.call_count, last_count + 1) + self.assertEqual(mock.call_args_list[0].args[0], 'dummy_foo_event') + alert = Alerts.objects.get(title='title_for_dummy_foo_event') + self.assertEqual(alert.source, "Dummy Foo Event") + + last_count = mock.call_count + with self.subTest('user off, system off'): + notif_user.user_mentioned = () # no alert + notif_user.save() + notif_system.user_mentioned = () # no alert + notif_system.save() + create_notification(event="user_mentioned", title="user_mentioned", recipients=['admin']) + self.assertEqual(mock.call_count, last_count + 0) + + last_count = mock.call_count + with self.subTest('user off, system on'): + notif_user.user_mentioned = () # no alert + notif_user.save() + notif_system.user_mentioned = DEFAULT_NOTIFICATION # alert only + notif_system.save() + create_notification(event="user_mentioned", title="user_mentioned", recipients=['admin']) + self.assertEqual(mock.call_count, last_count + 1) + + # Small note for this test-cast: Trump works only in positive direction - system is not able to disable some kind of notification if user enabled it + last_count = mock.call_count + with self.subTest('user on, system off'): + notif_user.user_mentioned = DEFAULT_NOTIFICATION # alert only + notif_user.save() + notif_system.user_mentioned = () # no alert + notif_system.save() + create_notification(event="user_mentioned", title="user_mentioned", recipients=['admin']) + self.assertEqual(mock.call_count, last_count + 1) + + last_count = mock.call_count + with self.subTest('user on, system on'): + notif_user.user_mentioned = DEFAULT_NOTIFICATION # alert only + notif_user.save() + notif_system.user_mentioned = DEFAULT_NOTIFICATION # alert only + notif_system.save() + create_notification(event="user_mentioned", title="user_mentioned", recipients=['admin']) + self.assertEqual(mock.call_count, last_count + 1) + + +class TestNotificationTriggers(DojoTestCase): + fixtures = ['dojo_testdata.json'] + + def setUp(self): + self.notification_tester = Dojo_User.objects.get(username="admin") + + @patch('dojo.notifications.helper.process_notifications') + def test_product_types(self, mock): + + last_count = mock.call_count + with self.subTest('product_type_added'): + with set_actor(self.notification_tester): + prod_type = Product_Type.objects.create(name='notif prod type') + self.assertEqual(mock.call_count, last_count + 4) + self.assertEqual(mock.call_args_list[-1].args[0], 'product_type_added') + self.assertEqual(mock.call_args_list[-1].kwargs['url'], f'/product/type/{prod_type.id}') + + last_count = mock.call_count + with self.subTest('product_type_deleted'): + with set_actor(self.notification_tester): + prod_type.delete() + self.assertEqual(mock.call_count, last_count + 1) + self.assertEqual(mock.call_args_list[-1].args[0], 'product_type_deleted') + self.assertEqual(mock.call_args_list[-1].kwargs['description'], 'The product type "notif prod type" was deleted by admin') + self.assertEqual(mock.call_args_list[-1].kwargs['url'], '/product/type') + + @patch('dojo.notifications.helper.process_notifications') + def test_products(self, mock): + + last_count = mock.call_count + with self.subTest('product_added'): + with set_actor(self.notification_tester): + prod_type = Product_Type.objects.first() + prod, _ = Product.objects.get_or_create(prod_type=prod_type, name='prod name') + self.assertEqual(mock.call_count, last_count + 5) + self.assertEqual(mock.call_args_list[-1].args[0], 'product_added') + self.assertEqual(mock.call_args_list[-1].kwargs['url'], f'/product/{prod.id}') + + last_count = mock.call_count + with self.subTest('product_deleted'): + with set_actor(self.notification_tester): + prod.delete() + self.assertEqual(mock.call_count, last_count + 2) + self.assertEqual(mock.call_args_list[-1].args[0], 'product_deleted') + self.assertEqual(mock.call_args_list[-1].kwargs['description'], 'The product "prod name" was deleted by admin') + self.assertEqual(mock.call_args_list[-1].kwargs['url'], '/product') + + @patch('dojo.notifications.helper.process_notifications') + def test_engagements(self, mock): + + last_count = mock.call_count + with self.subTest('engagement_added'): + with set_actor(self.notification_tester): + prod = Product.objects.first() + eng = Engagement.objects.create(product=prod, target_start=timezone.now(), target_end=timezone.now()) + self.assertEqual(mock.call_count, last_count + 5) + self.assertEqual(mock.call_args_list[-1].args[0], 'engagement_added') + self.assertEqual(mock.call_args_list[-1].kwargs['url'], f'/engagement/{eng.id}') + + last_count = mock.call_count + with self.subTest('close_engagement'): + with set_actor(self.notification_tester): + eng.status = "Completed" + eng.save() + self.assertEqual(mock.call_count, last_count + 5) + self.assertEqual(mock.call_args_list[-1].args[0], 'engagement_closed') + self.assertEqual(mock.call_args_list[-1].kwargs['url'], f'/engagement/{eng.id}/finding/all') + + last_count = mock.call_count + with self.subTest('reopen_engagement'): + with set_actor(self.notification_tester): + eng.status = "In Progress" + eng.save() + self.assertEqual(mock.call_count, last_count + 5) + self.assertEqual(mock.call_args_list[-1].args[0], 'engagement_reopened') + self.assertEqual(mock.call_args_list[-1].kwargs['url'], f'/engagement/{eng.id}') + + eng.status = "Not Started" + eng.save() + last_count = mock.call_count + with self.subTest('no reopen_engagement from not started'): + with set_actor(self.notification_tester): + eng.status = "In Progress" + eng.save() + self.assertEqual(mock.call_count, last_count) + + prod_type = Product_Type.objects.first() + prod1, _ = Product.objects.get_or_create(prod_type=prod_type, name='prod name 1') + _ = Engagement.objects.create(product=prod1, target_start=timezone.now(), target_end=timezone.now(), lead=User.objects.get(username='admin')) + prod2, _ = Product.objects.get_or_create(prod_type=prod_type, name='prod name 2') + eng2 = Engagement.objects.create(product=prod2, name="Testing engagement", target_start=timezone.now(), target_end=timezone.now(), lead=User.objects.get(username='admin')) + + with self.subTest('engagement_deleted by product'): # in case of product removal, we are not notifying about removal + with set_actor(self.notification_tester): + prod1.delete() + for call in mock.call_args_list: + self.assertNotEqual(call.args[0], 'engagement_deleted') + + last_count = mock.call_count + with self.subTest('engagement_deleted itself'): + with set_actor(self.notification_tester): + eng2.delete() + self.assertEqual(mock.call_count, last_count + 1) + self.assertEqual(mock.call_args_list[-1].args[0], 'engagement_deleted') + self.assertEqual(mock.call_args_list[-1].kwargs['description'], 'The engagement "Testing engagement" was deleted by admin') + self.assertEqual(mock.call_args_list[-1].kwargs['url'], f'/product/{prod2.id}') + + @patch('dojo.notifications.helper.process_notifications') + def test_endpoints(self, mock): + prod_type = Product_Type.objects.first() + prod1, _ = Product.objects.get_or_create(prod_type=prod_type, name='prod name 1') + Endpoint.objects.get_or_create(product=prod1, host='host1') + prod2, _ = Product.objects.get_or_create(prod_type=prod_type, name='prod name 2') + endpoint2, _ = Endpoint.objects.get_or_create(product=prod2, host='host2') + + with self.subTest('endpoint_deleted by product'): # in case of product removal, we are not notifying about removal + with set_actor(self.notification_tester): + prod1.delete() + for call in mock.call_args_list: + self.assertNotEqual(call.args[0], 'endpoint_deleted') + + last_count = mock.call_count + with self.subTest('endpoint_deleted itself'): + with set_actor(self.notification_tester): + endpoint2.delete() + self.assertEqual(mock.call_count, last_count + 2) + self.assertEqual(mock.call_args_list[-1].args[0], 'endpoint_deleted') + self.assertEqual(mock.call_args_list[-1].kwargs['description'], 'The endpoint "host2" was deleted by admin') + self.assertEqual(mock.call_args_list[-1].kwargs['url'], '/endpoint') + + @patch('dojo.notifications.helper.process_notifications') + def test_tests(self, mock): + prod_type = Product_Type.objects.first() + prod, _ = Product.objects.get_or_create(prod_type=prod_type, name='prod name') + eng1 = Engagement.objects.create(product=prod, target_start=timezone.now(), target_end=timezone.now(), lead=User.objects.get(username='admin')) + Test.objects.create(engagement=eng1, target_start=timezone.now(), target_end=timezone.now(), test_type_id=Test_Type.objects.first().id) + eng2 = Engagement.objects.create(product=prod, target_start=timezone.now(), target_end=timezone.now(), lead=User.objects.get(username='admin')) + test2 = Test.objects.create(engagement=eng2, target_start=timezone.now(), target_end=timezone.now(), test_type_id=Test_Type.objects.first().id) + + with self.subTest('test_deleted by engagement'): # in case of engagement removal, we are not notifying about removal + with set_actor(self.notification_tester): + eng1.delete() + for call in mock.call_args_list: + self.assertNotEqual(call.args[0], 'test_deleted') + + last_count = mock.call_count + with self.subTest('test_deleted itself'): + with set_actor(self.notification_tester): + test2.delete() + self.assertEqual(mock.call_count, last_count + 1) + self.assertEqual(mock.call_args_list[-1].args[0], 'test_deleted') + self.assertEqual(mock.call_args_list[-1].kwargs['description'], 'The test "Acunetix Scan" was deleted by admin') + self.assertEqual(mock.call_args_list[-1].kwargs['url'], f'/engagement/{eng2.id}') + + @patch('dojo.notifications.helper.process_notifications') + def test_finding_groups(self, mock): + prod_type = Product_Type.objects.first() + prod, _ = Product.objects.get_or_create(prod_type=prod_type, name='prod name') + eng, _ = Engagement.objects.get_or_create(product=prod, target_start=timezone.now(), target_end=timezone.now(), lead=User.objects.get(username='admin')) + test1, _ = Test.objects.get_or_create(engagement=eng, target_start=timezone.now(), target_end=timezone.now(), test_type_id=Test_Type.objects.first().id) + Finding_Group.objects.get_or_create(test=test1, creator=User.objects.get(username='admin')) + test2, _ = Test.objects.get_or_create(engagement=eng, target_start=timezone.now(), target_end=timezone.now(), test_type_id=Test_Type.objects.first().id) + fg2, _ = Finding_Group.objects.get_or_create(test=test2, name="fg test", creator=User.objects.get(username='admin')) + + with self.subTest('test_deleted by engagement'): # in case of engagement removal, we are not notifying about removal + with set_actor(self.notification_tester): + test1.delete() + for call in mock.call_args_list: + self.assertNotEqual(call.args[0], 'finding_group_deleted') + + last_count = mock.call_count + with self.subTest('test_deleted itself'): + with set_actor(self.notification_tester): + fg2.delete() + self.assertEqual(mock.call_count, last_count + 5) + self.assertEqual(mock.call_args_list[-1].args[0], 'finding_group_deleted') + self.assertEqual(mock.call_args_list[-1].kwargs['description'], 'The finding group "fg test" was deleted by admin') + self.assertEqual(mock.call_args_list[-1].kwargs['url'], f'/test/{test2.id}') + + @patch('dojo.notifications.helper.process_notifications') + @override_settings(ENABLE_AUDITLOG=True) + def test_auditlog_on(self, mock): + prod_type = Product_Type.objects.create(name='notif prod type') + with set_actor(self.notification_tester): + prod_type.delete() + self.assertEqual(mock.call_args_list[-1].kwargs['description'], 'The product type "notif prod type" was deleted by admin') + + @patch('dojo.notifications.helper.process_notifications') + @override_settings(ENABLE_AUDITLOG=False) + def test_auditlog_off(self, mock): + prod_type = Product_Type.objects.create(name='notif prod type') + with set_actor(self.notification_tester): + prod_type.delete() + self.assertEqual(mock.call_args_list[-1].kwargs['description'], 'The product type "notif prod type" was deleted') + + +class TestNotificationTriggersApi(APITestCase): + fixtures = ['dojo_testdata.json'] + + def setUp(self): + token = Token.objects.get(user__username='admin') + self.client = APIClient() + self.client.credentials(HTTP_AUTHORIZATION='Token ' + token.key) + + @patch('dojo.notifications.helper.process_notifications') + @override_settings(ENABLE_AUDITLOG=True) + def test_auditlog_on(self, mock): + prod_type = Product_Type.objects.create(name='notif prod type API') + self.client.delete(reverse('product_type-detail', args=(prod_type.pk,)), format='json') + self.assertEqual(mock.call_args_list[-1].kwargs['description'], 'The product type "notif prod type API" was deleted by admin') diff --git a/unittests/test_rest_framework.py b/unittests/test_rest_framework.py index 0427ceb1d90..823a10f3b59 100644 --- a/unittests/test_rest_framework.py +++ b/unittests/test_rest_framework.py @@ -1708,7 +1708,7 @@ def __init__(self, *args, **kwargs): } self.update_fields = {"first_name": "test changed", "configuration_permissions": [219, 220]} self.test_type = TestType.CONFIGURATION_PERMISSIONS - self.deleted_objects = 19 + self.deleted_objects = 25 BaseClass.RESTEndpointTest.__init__(self, *args, **kwargs) def test_create_user_with_non_configuration_permissions(self):