Skip to content

Commit

Permalink
create base class, re-use code import, reimport
Browse files Browse the repository at this point in the history
  • Loading branch information
hblankenship committed Oct 15, 2024
1 parent 13c179d commit 20eee47
Showing 1 changed file with 86 additions and 233 deletions.
319 changes: 86 additions & 233 deletions dojo/api_v2/serializers.py
Original file line number Diff line number Diff line change
Expand Up @@ -2047,7 +2047,7 @@ def get_findings_list(self, obj) -> List[int]:
return obj.open_findings_list


class ImportScanSerializer(serializers.Serializer):
class CommonImportScanSerializer(serializers.Serializer):
scan_date = serializers.DateField(
required=False,
help_text="Scan completion date will be used on all findings.",
Expand All @@ -2064,6 +2064,7 @@ class ImportScanSerializer(serializers.Serializer):
verified = serializers.BooleanField(
help_text="Override the verified setting from the tool.",
)

scan_type = serializers.ChoiceField(choices=get_choices_sorted())
# TODO: why do we allow only existing endpoints?
endpoint_to_add = serializers.PrimaryKeyRelatedField(
Expand All @@ -2085,9 +2086,7 @@ class ImportScanSerializer(serializers.Serializer):
required=False,
help_text="Resource link to source code",
)
engagement = serializers.PrimaryKeyRelatedField(
queryset=Engagement.objects.all(), required=False,
)

test_title = serializers.CharField(required=False)
auto_create_context = serializers.BooleanField(required=False)
deduplication_on_engagement = serializers.BooleanField(required=False)
Expand Down Expand Up @@ -2147,9 +2146,6 @@ class ImportScanSerializer(serializers.Serializer):
# extra fields populated in response
# need to use the _id suffix as without the serializer framework gets
# confused
test = serializers.IntegerField(
read_only=True,
) # left for backwards compatibility
test_id = serializers.IntegerField(read_only=True)
engagement_id = serializers.IntegerField(read_only=True)
product_id = serializers.IntegerField(read_only=True)
Expand All @@ -2164,6 +2160,88 @@ class ImportScanSerializer(serializers.Serializer):
required=False,
)

def get_importer(
self,
**kwargs: dict,
) -> BaseImporter:
"""
Returns a new instance of an importer that extends
the BaseImporter class
"""
return DefaultImporter(**kwargs)

def process_scan(
self,
data: dict,
context: dict,
) -> None:
"""
Process the scan with all of the supplied data fully massaged
into the format we are expecting
Raises exceptions in the event of an error
"""
try:
importer = self.get_importer(**context)
context["test"], _, _, _, _, _, _ = importer.process_scan(
context.pop("scan", None),
)
# Update the response body with some new data
if test := context.get("test"):
data["test"] = test.id
data["test_id"] = test.id
data["engagement_id"] = test.engagement.id
data["product_id"] = test.engagement.product.id
data["product_type_id"] = test.engagement.product.prod_type.id
data["statistics"] = {"after": test.statistics}
# convert to exception otherwise django rest framework will swallow them as 400 error
# exceptions are already logged in the importer
except SyntaxError as se:
raise Exception(se)
except ValueError as ve:
raise Exception(ve)

def validate(self, data: dict) -> dict:
scan_type = data.get("scan_type")
file = data.get("file")
if not file and requires_file(scan_type):
msg = f"Uploading a Report File is required for {scan_type}"
raise serializers.ValidationError(msg)
if file and is_scan_file_too_large(file):
msg = f"Report file is too large. Maximum supported size is {settings.SCAN_FILE_MAX_SIZE} MB"
raise serializers.ValidationError(msg)
tool_type = requires_tool_type(scan_type)
if tool_type:
api_scan_configuration = data.get("api_scan_configuration")
if (
api_scan_configuration
and tool_type
!= api_scan_configuration.tool_configuration.tool_type.name
):
msg = f"API scan configuration must be of tool type {tool_type}"
raise serializers.ValidationError(msg)
return data

def validate_scan_date(self, value: str) -> None:
if value and value > timezone.localdate():
msg = "The scan_date cannot be in the future!"
raise serializers.ValidationError(msg)
return value


class ImportScanSerializer(CommonImportScanSerializer):

engagement = serializers.PrimaryKeyRelatedField(
queryset=Engagement.objects.all(), required=False,
)

# extra fields populated in response
# need to use the _id suffix as without the serializer framework gets
# confused
test = serializers.IntegerField(
read_only=True,
) # left for backwards compatibility

def set_context(
self,
data: dict,
Expand Down Expand Up @@ -2242,47 +2320,6 @@ def process_auto_create_create_context(
# Raise an explicit drf exception here
raise ValidationError(str(e))

def get_importer(
self,
**kwargs: dict,
) -> BaseImporter:
"""
Returns a new instance of an importer that extends
the BaseImporter class
"""
return DefaultImporter(**kwargs)

def process_scan(
self,
data: dict,
context: dict,
) -> None:
"""
Process the scan with all of the supplied data fully massaged
into the format we are expecting
Raises exceptions in the event of an error
"""
try:
importer = self.get_importer(**context)
context["test"], _, _, _, _, _, _ = importer.process_scan(
context.pop("scan", None),
)
# Update the response body with some new data
if test := context.get("test"):
data["test"] = test.id
data["test_id"] = test.id
data["engagement_id"] = test.engagement.id
data["product_id"] = test.engagement.product.id
data["product_type_id"] = test.engagement.product.prod_type.id
data["statistics"] = {"after": test.statistics}
# convert to exception otherwise django rest framework will swallow them as 400 error
# exceptions are already logged in the importer
except SyntaxError as se:
raise Exception(se)
except ValueError as ve:
raise Exception(ve)

def save(self, push_to_jira=False):
# Go through the validate method
data = self.validated_data
Expand All @@ -2293,163 +2330,16 @@ def save(self, push_to_jira=False):
# Import the scan with all of the supplied data
self.process_scan(data, context)

def validate(self, data: dict) -> dict:
scan_type = data.get("scan_type")
file = data.get("file")
if not file and requires_file(scan_type):
msg = f"Uploading a Report File is required for {scan_type}"
raise serializers.ValidationError(msg)
if file and is_scan_file_too_large(file):
msg = f"Report file is too large. Maximum supported size is {settings.SCAN_FILE_MAX_SIZE} MB"
raise serializers.ValidationError(msg)
tool_type = requires_tool_type(scan_type)
if tool_type:
api_scan_configuration = data.get("api_scan_configuration")
if (
api_scan_configuration
and tool_type
!= api_scan_configuration.tool_configuration.tool_type.name
):
msg = f"API scan configuration must be of tool type {tool_type}"
raise serializers.ValidationError(msg)
return data

def validate_scan_date(self, value: str) -> None:
if value and value > timezone.localdate():
msg = "The scan_date cannot be in the future!"
raise serializers.ValidationError(msg)
return value

class ReImportScanSerializer(TaggitSerializer, CommonImportScanSerializer):

class ReImportScanSerializer(TaggitSerializer, serializers.Serializer):
scan_date = serializers.DateField(
required=False,
help_text="Scan completion date will be used on all findings.",
)
minimum_severity = serializers.ChoiceField(
choices=SEVERITY_CHOICES,
default="Info",
help_text="Minimum severity level to be imported",
)
active = serializers.BooleanField(
help_text="Override the active setting from the tool.",
)
verified = serializers.BooleanField(
help_text="Override the verified setting from the tool.",
)
help_do_not_reactivate = "Select if the import should ignore active findings from the report, useful for triage-less scanners. Will keep existing findings closed, without reactivating them. For more information check the docs."
do_not_reactivate = serializers.BooleanField(
default=False, required=False, help_text=help_do_not_reactivate,
)
scan_type = serializers.ChoiceField(
choices=get_choices_sorted(), required=True,
)
endpoint_to_add = serializers.PrimaryKeyRelatedField(
queryset=Endpoint.objects.all(),
required=False,
default=None,
help_text="Enter the ID of an Endpoint that is associated with the target Product. New Findings will be added to that Endpoint.",
)
file = serializers.FileField(allow_empty_file=True, required=False)
product_type_name = serializers.CharField(required=False)
product_name = serializers.CharField(required=False)
engagement_name = serializers.CharField(required=False)
engagement_end_date = serializers.DateField(
required=False,
help_text="End Date for Engagement. Default is current time + 365 days. Required format year-month-day",
)
source_code_management_uri = serializers.URLField(
max_length=600,
required=False,
help_text="Resource link to source code",
)
test = serializers.PrimaryKeyRelatedField(
required=False, queryset=Test.objects.all(),
)
test_title = serializers.CharField(required=False)
auto_create_context = serializers.BooleanField(required=False)
deduplication_on_engagement = serializers.BooleanField(required=False)

push_to_jira = serializers.BooleanField(default=False)
# Close the old findings if the parameter is not provided. This is to
# mentain the old API behavior after reintroducing the close_old_findings parameter
# also for ReImport.
close_old_findings = serializers.BooleanField(
required=False,
default=True,
help_text="Select if old findings no longer present in the report get closed as mitigated when importing.",
)
close_old_findings_product_scope = serializers.BooleanField(
required=False,
default=False,
help_text="Select if close_old_findings applies to all findings of the same type in the product. "
"By default, it is false meaning that only old findings of the same type in the engagement are in scope. "
"Note that this only applies on the first call to reimport-scan.",
)
version = serializers.CharField(
required=False,
help_text="Version that will be set on existing Test object. Leave empty to leave existing value in place.",
)
build_id = serializers.CharField(
required=False, help_text="ID of the build that was scanned.",
)
branch_tag = serializers.CharField(
required=False, help_text="Branch or Tag that was scanned.",
)
commit_hash = serializers.CharField(
required=False, help_text="Commit that was scanned.",
)
api_scan_configuration = serializers.PrimaryKeyRelatedField(
allow_null=True,
default=None,
queryset=Product_API_Scan_Configuration.objects.all(),
)
service = serializers.CharField(
required=False,
help_text="A service is a self-contained piece of functionality within a Product. "
"This is an optional field which is used in deduplication and closing of old findings when set. "
"This affects the whole engagement/product depending on your deduplication scope.",
)
environment = serializers.CharField(required=False)
lead = serializers.PrimaryKeyRelatedField(
allow_null=True, default=None, queryset=User.objects.all(),
)
tags = TagListSerializerField(
required=False,
allow_empty=True,
help_text="Modify existing tags that help describe this scan. (Existing test tags will be overwritten)",
)

group_by = serializers.ChoiceField(
required=False,
choices=Finding_Group.GROUP_BY_OPTIONS,
help_text="Choose an option to automatically group new findings by the chosen option.",
)
create_finding_groups_for_all_findings = serializers.BooleanField(
help_text="If set to false, finding groups will only be created when there is more than one grouped finding",
required=False,
default=True,
)

# extra fields populated in response
# need to use the _id suffix as without the serializer framework gets
# confused
test_id = serializers.IntegerField(read_only=True)
engagement_id = serializers.IntegerField(
read_only=True,
) # need to use the _id suffix as without the serializer framework gets confused
product_id = serializers.IntegerField(read_only=True)
product_type_id = serializers.IntegerField(read_only=True)

statistics = ImportStatisticsSerializer(read_only=True, required=False)
apply_tags_to_findings = serializers.BooleanField(
help_text="If set to True, the tags will be applied to the findings",
required=False,
)
apply_tags_to_endpoints = serializers.BooleanField(
help_text="If set to True, the tags will be applied to the endpoints",
required=False,
)

def set_context(
self,
Expand Down Expand Up @@ -2529,16 +2419,6 @@ def process_auto_create_create_context(
# Raise an explicit drf exception here
raise ValidationError(str(e))

def get_importer(
self,
**kwargs: dict,
) -> BaseImporter:
"""
Returns a new instance of an importer that extends
the BaseImporter class
"""
return DefaultImporter(**kwargs)

def get_reimporter(
self,
**kwargs: dict,
Expand Down Expand Up @@ -2617,33 +2497,6 @@ def save(self, push_to_jira=False):
# Import the scan with all of the supplied data
self.process_scan(auto_create_manager, data, context)

def validate(self, data):
scan_type = data.get("scan_type")
file = data.get("file")
if not file and requires_file(scan_type):
msg = f"Uploading a Report File is required for {scan_type}"
raise serializers.ValidationError(msg)
if file and is_scan_file_too_large(file):
msg = f"Report file is too large. Maximum supported size is {settings.SCAN_FILE_MAX_SIZE} MB"
raise serializers.ValidationError(msg)
tool_type = requires_tool_type(scan_type)
if tool_type:
api_scan_configuration = data.get("api_scan_configuration")
if (
api_scan_configuration
and tool_type
!= api_scan_configuration.tool_configuration.tool_type.name
):
msg = f"API scan configuration must be of tool type {tool_type}"
raise serializers.ValidationError(msg)
return data

def validate_scan_date(self, value):
if value and value > timezone.localdate():
msg = "The scan_date cannot be in the future!"
raise serializers.ValidationError(msg)
return value


class EndpointMetaImporterSerializer(serializers.Serializer):
file = serializers.FileField(required=True)
Expand Down

0 comments on commit 20eee47

Please sign in to comment.