Skip to content

Commit

Permalink
Importer + Reimport: Reorg, cleanup, comment (#10011)
Browse files Browse the repository at this point in the history
* First pass at base importer class

* Further implementation and comments

* Further implementation and comments

* (untested) complete default importer

* Importer clean up process_finding function

* Complete import CBV (untested)

* Importer: passing manual testing

* Reimporter + auto create context manager (untested)

* Completed testing by hand

* Add missing tags patch

* Correct some unit tests

* Pass API tests

* Complete reimport CBV

* Fix URL mappings

* Remove old importers

* Correct missed import error

* Remove testing cruft

* Fix ruff

* Remove extraneous comment

* Fix flake 8

* Fix some copy/paste errors

* Correcting unit tests

* Remove lookups for a function called excessively

* Fix ruff errors

* More unit test fun

* Fix ruff stuff

* Flake8

* Fix typo

* Fix conflicts

* Fix quoting in unit test

* Fix tests for real...

* Some feedback

* Fix oopsies

* Fix some feedback

* Typo

* testing before comitting..
  • Loading branch information
Maffooch authored May 3, 2024
1 parent 4b13d32 commit 6ad06a9
Show file tree
Hide file tree
Showing 21 changed files with 4,087 additions and 2,746 deletions.
206 changes: 89 additions & 117 deletions dojo/api_v2/permissions.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,23 +8,13 @@
ValidationError,
)

from dojo.api_v2.serializers import (
get_import_meta_data_from_dict,
get_product_id_from_dict,
)
from dojo.authorization.authorization import (
user_has_configuration_permission,
user_has_global_permission,
user_has_permission,
)
from dojo.authorization.roles_permissions import Permissions
from dojo.importers.reimporter.utils import (
get_target_engagement_if_exists,
get_target_product_by_id_if_exists,
get_target_product_if_exists,
get_target_product_type_if_exists,
get_target_test_if_exists,
)
from dojo.importers.auto_create_context import AutoCreateContextManager
from dojo.models import (
Cred_Mapping,
Dojo_Group,
Expand Down Expand Up @@ -427,58 +417,52 @@ class UserHasImportPermission(permissions.BasePermission):
def has_permission(self, request, view):
# permission check takes place before validation, so we don't have access to serializer.validated_data()
# and we have to validate ourselves unfortunately

(
_,
_,
_,
engagement_id,
engagement_name,
product_name,
product_type_name,
auto_create_context,
_deduplication_on_engagement,
_do_not_reactivate,
) = get_import_meta_data_from_dict(request.data)
product_type = get_target_product_type_if_exists(product_type_name)
product = get_target_product_if_exists(product_name, product_type_name)
engagement = get_target_engagement_if_exists(
engagement_id, engagement_name, product
)

if engagement:
auto_create = AutoCreateContextManager()
# Process the context to make an conversions needed. Catch any exceptions
# in this case and wrap them in a DRF exception
try:
converted_dict = auto_create.convert_querydict_to_dict(request.data)
auto_create.process_import_meta_data_from_dict(converted_dict)
# Get an existing product
converted_dict["product_type"] = auto_create.get_target_product_type_if_exists(**converted_dict)
converted_dict["product"] = auto_create.get_target_product_if_exists(**converted_dict)
converted_dict["engagement"] = auto_create.get_target_engagement_if_exists(**converted_dict)
except (ValueError, TypeError) as e:
# Raise an explicit drf exception here
raise ValidationError(e)
if engagement := converted_dict.get("engagement"):
# existing engagement, nothing special to check
return user_has_permission(
request.user, engagement, Permissions.Import_Scan_Result
)
elif engagement_id:
elif engagement_id := converted_dict.get("engagement_id"):
# engagement_id doesn't exist
msg = f"Engagement '{engagement_id}' doesn't exist"
msg = f"Engagement \"{engagement_id}\" does not exist"
raise serializers.ValidationError(msg)

if not auto_create_context:
if not converted_dict.get("auto_create_context"):
raise_no_auto_create_import_validation_error(
None,
None,
engagement_name,
product_name,
product_type_name,
engagement,
product,
product_type,
converted_dict.get("engagement_name"),
converted_dict.get("product_name"),
converted_dict.get("product_type_name"),
converted_dict.get("engagement"),
converted_dict.get("product"),
converted_dict.get("product_type"),
"Need engagement_id or product_name + engagement_name to perform import",
)
else:
# the engagement doesn't exist, so we need to check if the user has
# requested and is allowed to use auto_create
return check_auto_create_permission(
request.user,
product,
product_name,
engagement,
engagement_name,
product_type,
product_type_name,
converted_dict.get("product"),
converted_dict.get("product_name"),
converted_dict.get("engagement"),
converted_dict.get("engagement_name"),
converted_dict.get("product_type"),
converted_dict.get("product_type_name"),
"Need engagement_id or product_name + engagement_name to perform import",
)

Expand All @@ -487,32 +471,28 @@ class UserHasMetaImportPermission(permissions.BasePermission):
def has_permission(self, request, view):
# permission check takes place before validation, so we don't have access to serializer.validated_data()
# and we have to validate ourselves unfortunately

(
_,
_,
_,
_,
_,
product_name,
_,
_,
_,
_,
) = get_import_meta_data_from_dict(request.data)
product = get_target_product_if_exists(product_name)
if not product:
product_id = get_product_id_from_dict(request.data)
product = get_target_product_by_id_if_exists(product_id)
auto_create = AutoCreateContextManager()
# Process the context to make an conversions needed. Catch any exceptions
# in this case and wrap them in a DRF exception
try:
converted_dict = auto_create.convert_querydict_to_dict(request.data)
auto_create.process_import_meta_data_from_dict(converted_dict)
# Get an existing product
product = auto_create.get_target_product_if_exists(**converted_dict)
if not product:
product = auto_create.get_target_product_by_id_if_exists(**converted_dict)
except (ValueError, TypeError) as e:
# Raise an explicit drf exception here
raise ValidationError(e)

if product:
# existing product, nothing special to check
return user_has_permission(
request.user, product, Permissions.Import_Scan_Result
)
elif product_id:
elif product_id := converted_dict.get("product_id"):
# product_id doesn't exist
msg = f"product '{product_id}' doesn't exist"
msg = f"Product \"{product_id}\" does not exist"
raise serializers.ValidationError(msg)
else:
msg = "Need product_id or product_name to perform import"
Expand Down Expand Up @@ -631,62 +611,54 @@ class UserHasReimportPermission(permissions.BasePermission):
def has_permission(self, request, view):
# permission check takes place before validation, so we don't have access to serializer.validated_data()
# and we have to validate ourselves unfortunately

(
test_id,
test_title,
scan_type,
_,
engagement_name,
product_name,
product_type_name,
auto_create_context,
_deduplication_on_engagement,
_do_not_reactivate,
) = get_import_meta_data_from_dict(request.data)

product_type = get_target_product_type_if_exists(product_type_name)
product = get_target_product_if_exists(product_name, product_type_name)
engagement = get_target_engagement_if_exists(
None, engagement_name, product
)
test = get_target_test_if_exists(
test_id, test_title, scan_type, engagement
)

if test:
auto_create = AutoCreateContextManager()
# Process the context to make an conversions needed. Catch any exceptions
# in this case and wrap them in a DRF exception
try:
converted_dict = auto_create.convert_querydict_to_dict(request.data)
auto_create.process_import_meta_data_from_dict(converted_dict)
# Get an existing product
converted_dict["product_type"] = auto_create.get_target_product_type_if_exists(**converted_dict)
converted_dict["product"] = auto_create.get_target_product_if_exists(**converted_dict)
converted_dict["engagement"] = auto_create.get_target_engagement_if_exists(**converted_dict)
converted_dict["test"] = auto_create.get_target_test_if_exists(**converted_dict)
except (ValueError, TypeError) as e:
# Raise an explicit drf exception here
raise ValidationError(e)

if test := converted_dict.get("test"):
# existing test, nothing special to check
return user_has_permission(
request.user, test, Permissions.Import_Scan_Result
)
elif test_id:
elif test_id := converted_dict.get("test_id"):
# test_id doesn't exist
msg = f"Test '{test_id}' doesn't exist"
msg = f"Test \"{test_id}\" does not exist"
raise serializers.ValidationError(msg)

if not auto_create_context:
if not converted_dict.get("auto_create_context"):
raise_no_auto_create_import_validation_error(
test_title,
scan_type,
engagement_name,
product_name,
product_type_name,
engagement,
product,
product_type,
converted_dict.get("test_title"),
converted_dict.get("scan_type"),
converted_dict.get("engagement_name"),
converted_dict.get("product_name"),
converted_dict.get("product_type_name"),
converted_dict.get("engagement"),
converted_dict.get("product"),
converted_dict.get("product_type"),
"Need test_id or product_name + engagement_name + scan_type to perform reimport",
)
else:
# the test doesn't exist, so we need to check if the user has
# requested and is allowed to use auto_create
return check_auto_create_permission(
request.user,
product,
product_name,
engagement,
engagement_name,
product_type,
product_type_name,
converted_dict.get("product"),
converted_dict.get("product_name"),
converted_dict.get("engagement"),
converted_dict.get("engagement_name"),
converted_dict.get("product_type"),
converted_dict.get("product_type_name"),
"Need test_id or product_name + engagement_name + scan_type to perform reimport",
)

Expand Down Expand Up @@ -955,28 +927,28 @@ def raise_no_auto_create_import_validation_error(
raise ValidationError(msg)

if product_type_name and not product_type:
msg = f"Product Type '{product_type_name}' doesn't exist"
msg = f"Product Type \"{product_type_name}\" does not exist"
raise serializers.ValidationError(msg)

if product_name and not product:
if product_type_name:
msg = f"Product '{product_name}' doesn't exist in Product_Type '{product_type_name}'"
msg = f"Product \"{product_name}\" does not exist in Product_Type \"{product_type_name}\""
raise serializers.ValidationError(msg)
else:
msg = f"Product '{product_name}' doesn't exist"
msg = f"Product \"{product_name}\" does not exist"
raise serializers.ValidationError(msg)

if engagement_name and not engagement:
msg = f"Engagement '{engagement_name}' doesn't exist in Product '{product_name}'"
msg = f"Engagement \"{engagement_name}\" does not exist in Product \"{product_name}\""
raise serializers.ValidationError(msg)

# these are only set for reimport
if test_title:
msg = f"Test '{test_title}' with scan_type '{scan_type}' doesn't exist in Engagement '{engagement_name}'"
msg = f"Test \"{test_title}\" with scan_type \"{scan_type}\" does not exist in Engagement \"{engagement_name}\""
raise serializers.ValidationError(msg)

if scan_type:
msg = f"Test with scan_type '{scan_type}' doesn't exist in Engagement '{engagement_name}'"
msg = f"Test with scan_type \"{scan_type}\" does not exist in Engagement \"{engagement_name}\""
raise serializers.ValidationError(msg)

raise ValidationError(error_message)
Expand Down Expand Up @@ -1023,28 +995,28 @@ def check_auto_create_permission(

if product and product_name and engagement_name:
if not user_has_permission(user, product, Permissions.Engagement_Add):
msg = f"No permission to create engagements in product '{product_name}'"
msg = f"No permission to create engagements in product \"{product_name}\""
raise PermissionDenied(msg)

if not user_has_permission(
user, product, Permissions.Import_Scan_Result
):
msg = f"No permission to import scans into product '{product_name}'"
msg = f"No permission to import scans into product \"{product_name}\""
raise PermissionDenied(msg)

# all good
return True

if not product and product_name:
if not product_type_name:
msg = f"Product '{product_name}' doesn't exist and no product_type_name provided to create the new product in"
msg = f"Product \"{product_name}\" does not exist and no product_type_name provided to create the new product in"
raise serializers.ValidationError(msg)

if not product_type:
if not user_has_global_permission(
user, Permissions.Product_Type_Add
):
msg = f"No permission to create product_type '{product_type_name}'"
msg = f"No permission to create product_type \"{product_type_name}\""
raise PermissionDenied(msg)
# new product type can be created with current user as owner, so
# all objects in it can be created as well
Expand All @@ -1053,7 +1025,7 @@ def check_auto_create_permission(
if not user_has_permission(
user, product_type, Permissions.Product_Type_Add_Product
):
msg = f"No permission to create products in product_type '{product_type}'"
msg = f"No permission to create products in product_type \"{product_type}\""
raise PermissionDenied(msg)

# product can be created, so objects in it can be created as well
Expand Down
Loading

0 comments on commit 6ad06a9

Please sign in to comment.