diff --git a/dojo/api_v2/serializers.py b/dojo/api_v2/serializers.py index c9f73c97ce..de0e6a49de 100644 --- a/dojo/api_v2/serializers.py +++ b/dojo/api_v2/serializers.py @@ -417,6 +417,51 @@ class Meta: fields = "__all__" +class MetadataSerializer(serializers.Serializer): + name = serializers.CharField(max_length=120) + value = serializers.CharField(max_length=300) + + +class MetaMainSerializer(serializers.Serializer): + id = serializers.IntegerField(read_only=True) + + product = serializers.PrimaryKeyRelatedField( + queryset=Product.objects.all(), + required=False, + default=None, + allow_null=True, + ) + endpoint = serializers.PrimaryKeyRelatedField( + queryset=Endpoint.objects.all(), + required=False, + default=None, + allow_null=True, + ) + finding = serializers.PrimaryKeyRelatedField( + queryset=Finding.objects.all(), + required=False, + default=None, + allow_null=True, + ) + metadata = MetadataSerializer(many=True) + + def validate(self, data): + product_id = data.get("product", None) + endpoint_id = data.get("endpoint", None) + finding_id = data.get("finding", None) + metadata = data.get("metadata") + + for item in metadata: + # this will only verify that one and only one of product, endpoint, or finding is passed... + DojoMeta(product=product_id, + endpoint=endpoint_id, + finding=finding_id, + name=item.get("name"), + value=item.get("value")).clean() + + return data + + class ProductMetaSerializer(serializers.ModelSerializer): class Meta: model = DojoMeta diff --git a/dojo/api_v2/views.py b/dojo/api_v2/views.py index e6dd30dab4..6caea55747 100644 --- a/dojo/api_v2/views.py +++ b/dojo/api_v2/views.py @@ -1650,6 +1650,61 @@ class DojoMetaViewSet( def get_queryset(self): return get_authorized_dojo_meta(Permissions.Product_View) + @extend_schema( + methods=["post", "patch"], + request=serializers.MetaMainSerializer, + responses={status.HTTP_200_OK: serializers.MetaMainSerializer}, + filters=False, + ) + @action( + detail=False, methods=["post", "patch"], pagination_class=None, + ) + def batch(self, request, pk=None): + serialized_data = serializers.MetaMainSerializer(data=request.data) + if serialized_data.is_valid(raise_exception=True): + if request.method == "POST": + self.process_post(request.data) + if request.method == "PATCH": + self.process_patch(request.data) + + return Response(status=status.HTTP_201_CREATED, data=serialized_data.data) + + def process_post(self: object, data: dict): + product = Product.objects.filter(id=data.get("product")).first() + finding = Finding.objects.filter(id=data.get("finding")).first() + endpoint = Endpoint.objects.filter(id=data.get("endpoint")).first() + metalist = data.get("metadata") + for metadata in metalist: + try: + DojoMeta.objects.create( + product=product, + finding=finding, + endpoint=endpoint, + name=metadata.get("name"), + value=metadata.get("value"), + ) + except (IntegrityError) as ex: # this should not happen as the data was validated in the batch call + raise ValidationError(str(ex)) + + def process_patch(self: object, data: dict): + product = Product.objects.filter(id=data.get("product")).first() + finding = Finding.objects.filter(id=data.get("finding")).first() + endpoint = Endpoint.objects.filter(id=data.get("endpoint")).first() + metalist = data.get("metadata") + for metadata in metalist: + dojometa = DojoMeta.objects.filter(product=product, finding=finding, endpoint=endpoint, name=metadata.get("name")) + if dojometa: + try: + dojometa.update( + name=metadata.get("name"), + value=metadata.get("value"), + ) + except (IntegrityError) as ex: + raise ValidationError(str(ex)) + else: + msg = f"Metadata {metadata.get('name')} not found for object." + raise ValidationError(msg) + @extend_schema_view(**schema_with_prefetch()) class ProductViewSet(