From 20eee47f4c4ae2ac8ddf0e266316acb56934d673 Mon Sep 17 00:00:00 2001 From: hblankenship Date: Tue, 15 Oct 2024 17:41:20 -0500 Subject: [PATCH] create base class, re-use code import, reimport --- dojo/api_v2/serializers.py | 319 ++++++++++--------------------------- 1 file changed, 86 insertions(+), 233 deletions(-) diff --git a/dojo/api_v2/serializers.py b/dojo/api_v2/serializers.py index 48395f748fd..368a2657359 100644 --- a/dojo/api_v2/serializers.py +++ b/dojo/api_v2/serializers.py @@ -2047,7 +2047,7 @@ def get_findings_list(self, obj) -> List[int]: return obj.open_findings_list -class ImportScanSerializer(serializers.Serializer): +class CommonImportScanSerializer(serializers.Serializer): scan_date = serializers.DateField( required=False, help_text="Scan completion date will be used on all findings.", @@ -2064,6 +2064,7 @@ class ImportScanSerializer(serializers.Serializer): verified = serializers.BooleanField( help_text="Override the verified setting from the tool.", ) + scan_type = serializers.ChoiceField(choices=get_choices_sorted()) # TODO: why do we allow only existing endpoints? endpoint_to_add = serializers.PrimaryKeyRelatedField( @@ -2085,9 +2086,7 @@ class ImportScanSerializer(serializers.Serializer): required=False, help_text="Resource link to source code", ) - engagement = serializers.PrimaryKeyRelatedField( - queryset=Engagement.objects.all(), required=False, - ) + test_title = serializers.CharField(required=False) auto_create_context = serializers.BooleanField(required=False) deduplication_on_engagement = serializers.BooleanField(required=False) @@ -2147,9 +2146,6 @@ class ImportScanSerializer(serializers.Serializer): # extra fields populated in response # need to use the _id suffix as without the serializer framework gets # confused - test = serializers.IntegerField( - read_only=True, - ) # left for backwards compatibility test_id = serializers.IntegerField(read_only=True) engagement_id = serializers.IntegerField(read_only=True) product_id = serializers.IntegerField(read_only=True) @@ -2164,6 +2160,88 @@ class ImportScanSerializer(serializers.Serializer): required=False, ) + def get_importer( + self, + **kwargs: dict, + ) -> BaseImporter: + """ + Returns a new instance of an importer that extends + the BaseImporter class + """ + return DefaultImporter(**kwargs) + + def process_scan( + self, + data: dict, + context: dict, + ) -> None: + """ + Process the scan with all of the supplied data fully massaged + into the format we are expecting + + Raises exceptions in the event of an error + """ + try: + importer = self.get_importer(**context) + context["test"], _, _, _, _, _, _ = importer.process_scan( + context.pop("scan", None), + ) + # Update the response body with some new data + if test := context.get("test"): + data["test"] = test.id + data["test_id"] = test.id + data["engagement_id"] = test.engagement.id + data["product_id"] = test.engagement.product.id + data["product_type_id"] = test.engagement.product.prod_type.id + data["statistics"] = {"after": test.statistics} + # convert to exception otherwise django rest framework will swallow them as 400 error + # exceptions are already logged in the importer + except SyntaxError as se: + raise Exception(se) + except ValueError as ve: + raise Exception(ve) + + def validate(self, data: dict) -> dict: + scan_type = data.get("scan_type") + file = data.get("file") + if not file and requires_file(scan_type): + msg = f"Uploading a Report File is required for {scan_type}" + raise serializers.ValidationError(msg) + if file and is_scan_file_too_large(file): + msg = f"Report file is too large. Maximum supported size is {settings.SCAN_FILE_MAX_SIZE} MB" + raise serializers.ValidationError(msg) + tool_type = requires_tool_type(scan_type) + if tool_type: + api_scan_configuration = data.get("api_scan_configuration") + if ( + api_scan_configuration + and tool_type + != api_scan_configuration.tool_configuration.tool_type.name + ): + msg = f"API scan configuration must be of tool type {tool_type}" + raise serializers.ValidationError(msg) + return data + + def validate_scan_date(self, value: str) -> None: + if value and value > timezone.localdate(): + msg = "The scan_date cannot be in the future!" + raise serializers.ValidationError(msg) + return value + + +class ImportScanSerializer(CommonImportScanSerializer): + + engagement = serializers.PrimaryKeyRelatedField( + queryset=Engagement.objects.all(), required=False, + ) + + # extra fields populated in response + # need to use the _id suffix as without the serializer framework gets + # confused + test = serializers.IntegerField( + read_only=True, + ) # left for backwards compatibility + def set_context( self, data: dict, @@ -2242,47 +2320,6 @@ def process_auto_create_create_context( # Raise an explicit drf exception here raise ValidationError(str(e)) - def get_importer( - self, - **kwargs: dict, - ) -> BaseImporter: - """ - Returns a new instance of an importer that extends - the BaseImporter class - """ - return DefaultImporter(**kwargs) - - def process_scan( - self, - data: dict, - context: dict, - ) -> None: - """ - Process the scan with all of the supplied data fully massaged - into the format we are expecting - - Raises exceptions in the event of an error - """ - try: - importer = self.get_importer(**context) - context["test"], _, _, _, _, _, _ = importer.process_scan( - context.pop("scan", None), - ) - # Update the response body with some new data - if test := context.get("test"): - data["test"] = test.id - data["test_id"] = test.id - data["engagement_id"] = test.engagement.id - data["product_id"] = test.engagement.product.id - data["product_type_id"] = test.engagement.product.prod_type.id - data["statistics"] = {"after": test.statistics} - # convert to exception otherwise django rest framework will swallow them as 400 error - # exceptions are already logged in the importer - except SyntaxError as se: - raise Exception(se) - except ValueError as ve: - raise Exception(ve) - def save(self, push_to_jira=False): # Go through the validate method data = self.validated_data @@ -2293,163 +2330,16 @@ def save(self, push_to_jira=False): # Import the scan with all of the supplied data self.process_scan(data, context) - def validate(self, data: dict) -> dict: - scan_type = data.get("scan_type") - file = data.get("file") - if not file and requires_file(scan_type): - msg = f"Uploading a Report File is required for {scan_type}" - raise serializers.ValidationError(msg) - if file and is_scan_file_too_large(file): - msg = f"Report file is too large. Maximum supported size is {settings.SCAN_FILE_MAX_SIZE} MB" - raise serializers.ValidationError(msg) - tool_type = requires_tool_type(scan_type) - if tool_type: - api_scan_configuration = data.get("api_scan_configuration") - if ( - api_scan_configuration - and tool_type - != api_scan_configuration.tool_configuration.tool_type.name - ): - msg = f"API scan configuration must be of tool type {tool_type}" - raise serializers.ValidationError(msg) - return data - - def validate_scan_date(self, value: str) -> None: - if value and value > timezone.localdate(): - msg = "The scan_date cannot be in the future!" - raise serializers.ValidationError(msg) - return value +class ReImportScanSerializer(TaggitSerializer, CommonImportScanSerializer): -class ReImportScanSerializer(TaggitSerializer, serializers.Serializer): - scan_date = serializers.DateField( - required=False, - help_text="Scan completion date will be used on all findings.", - ) - minimum_severity = serializers.ChoiceField( - choices=SEVERITY_CHOICES, - default="Info", - help_text="Minimum severity level to be imported", - ) - active = serializers.BooleanField( - help_text="Override the active setting from the tool.", - ) - verified = serializers.BooleanField( - help_text="Override the verified setting from the tool.", - ) help_do_not_reactivate = "Select if the import should ignore active findings from the report, useful for triage-less scanners. Will keep existing findings closed, without reactivating them. For more information check the docs." do_not_reactivate = serializers.BooleanField( default=False, required=False, help_text=help_do_not_reactivate, ) - scan_type = serializers.ChoiceField( - choices=get_choices_sorted(), required=True, - ) - endpoint_to_add = serializers.PrimaryKeyRelatedField( - queryset=Endpoint.objects.all(), - required=False, - default=None, - help_text="Enter the ID of an Endpoint that is associated with the target Product. New Findings will be added to that Endpoint.", - ) - file = serializers.FileField(allow_empty_file=True, required=False) - product_type_name = serializers.CharField(required=False) - product_name = serializers.CharField(required=False) - engagement_name = serializers.CharField(required=False) - engagement_end_date = serializers.DateField( - required=False, - help_text="End Date for Engagement. Default is current time + 365 days. Required format year-month-day", - ) - source_code_management_uri = serializers.URLField( - max_length=600, - required=False, - help_text="Resource link to source code", - ) test = serializers.PrimaryKeyRelatedField( required=False, queryset=Test.objects.all(), ) - test_title = serializers.CharField(required=False) - auto_create_context = serializers.BooleanField(required=False) - deduplication_on_engagement = serializers.BooleanField(required=False) - - push_to_jira = serializers.BooleanField(default=False) - # Close the old findings if the parameter is not provided. This is to - # mentain the old API behavior after reintroducing the close_old_findings parameter - # also for ReImport. - close_old_findings = serializers.BooleanField( - required=False, - default=True, - help_text="Select if old findings no longer present in the report get closed as mitigated when importing.", - ) - close_old_findings_product_scope = serializers.BooleanField( - required=False, - default=False, - help_text="Select if close_old_findings applies to all findings of the same type in the product. " - "By default, it is false meaning that only old findings of the same type in the engagement are in scope. " - "Note that this only applies on the first call to reimport-scan.", - ) - version = serializers.CharField( - required=False, - help_text="Version that will be set on existing Test object. Leave empty to leave existing value in place.", - ) - build_id = serializers.CharField( - required=False, help_text="ID of the build that was scanned.", - ) - branch_tag = serializers.CharField( - required=False, help_text="Branch or Tag that was scanned.", - ) - commit_hash = serializers.CharField( - required=False, help_text="Commit that was scanned.", - ) - api_scan_configuration = serializers.PrimaryKeyRelatedField( - allow_null=True, - default=None, - queryset=Product_API_Scan_Configuration.objects.all(), - ) - service = serializers.CharField( - required=False, - help_text="A service is a self-contained piece of functionality within a Product. " - "This is an optional field which is used in deduplication and closing of old findings when set. " - "This affects the whole engagement/product depending on your deduplication scope.", - ) - environment = serializers.CharField(required=False) - lead = serializers.PrimaryKeyRelatedField( - allow_null=True, default=None, queryset=User.objects.all(), - ) - tags = TagListSerializerField( - required=False, - allow_empty=True, - help_text="Modify existing tags that help describe this scan. (Existing test tags will be overwritten)", - ) - - group_by = serializers.ChoiceField( - required=False, - choices=Finding_Group.GROUP_BY_OPTIONS, - help_text="Choose an option to automatically group new findings by the chosen option.", - ) - create_finding_groups_for_all_findings = serializers.BooleanField( - help_text="If set to false, finding groups will only be created when there is more than one grouped finding", - required=False, - default=True, - ) - - # extra fields populated in response - # need to use the _id suffix as without the serializer framework gets - # confused - test_id = serializers.IntegerField(read_only=True) - engagement_id = serializers.IntegerField( - read_only=True, - ) # need to use the _id suffix as without the serializer framework gets confused - product_id = serializers.IntegerField(read_only=True) - product_type_id = serializers.IntegerField(read_only=True) - - statistics = ImportStatisticsSerializer(read_only=True, required=False) - apply_tags_to_findings = serializers.BooleanField( - help_text="If set to True, the tags will be applied to the findings", - required=False, - ) - apply_tags_to_endpoints = serializers.BooleanField( - help_text="If set to True, the tags will be applied to the endpoints", - required=False, - ) def set_context( self, @@ -2529,16 +2419,6 @@ def process_auto_create_create_context( # Raise an explicit drf exception here raise ValidationError(str(e)) - def get_importer( - self, - **kwargs: dict, - ) -> BaseImporter: - """ - Returns a new instance of an importer that extends - the BaseImporter class - """ - return DefaultImporter(**kwargs) - def get_reimporter( self, **kwargs: dict, @@ -2617,33 +2497,6 @@ def save(self, push_to_jira=False): # Import the scan with all of the supplied data self.process_scan(auto_create_manager, data, context) - def validate(self, data): - scan_type = data.get("scan_type") - file = data.get("file") - if not file and requires_file(scan_type): - msg = f"Uploading a Report File is required for {scan_type}" - raise serializers.ValidationError(msg) - if file and is_scan_file_too_large(file): - msg = f"Report file is too large. Maximum supported size is {settings.SCAN_FILE_MAX_SIZE} MB" - raise serializers.ValidationError(msg) - tool_type = requires_tool_type(scan_type) - if tool_type: - api_scan_configuration = data.get("api_scan_configuration") - if ( - api_scan_configuration - and tool_type - != api_scan_configuration.tool_configuration.tool_type.name - ): - msg = f"API scan configuration must be of tool type {tool_type}" - raise serializers.ValidationError(msg) - return data - - def validate_scan_date(self, value): - if value and value > timezone.localdate(): - msg = "The scan_date cannot be in the future!" - raise serializers.ValidationError(msg) - return value - class EndpointMetaImporterSerializer(serializers.Serializer): file = serializers.FileField(required=True)