diff --git a/open_api_client_build/build_client.py b/open_api_client_build/build_client.py index e8f6cfd6..e5080350 100644 --- a/open_api_client_build/build_client.py +++ b/open_api_client_build/build_client.py @@ -55,24 +55,24 @@ def fix_spec(src: str, path: str | os.PathLike): "you have to provide a file or url keyword as 1st arg." ) spec_paths = spec["paths"] - if ( - octet_schema := spec_paths.get( - "/projects/{projectId}/testruns/{testRunId}/actions/importXUnitTestResults", - {}, - ) - .get("post", {}) - .get("requestBody", {}) - .get("content", {}) - .get("application/octet-stream", {}) - .get("schema") + if octet_schema := spec_paths._get_multi( + "/projects/{projectId}/testruns/{testRunId}/actions/importXUnitTestResults", + {}, + page_size=100, + page_number=1, ): - if octet_schema.get("type") == "object": + if ( + octet_schema._get_multi("type", page_size=100, page_number=1) + == "object" + ): octet_schema["type"] = "string" octet_schema["format"] = "binary" for spec_path in spec_paths.values(): for operation_description in spec_path.values(): - if responses := operation_description.get("responses"): + if responses := operation_description._get_multi( + "responses", page_size=100, page_number=1 + ): if "4XX-5XX" in responses: for code, resp in responses.items(): if error_code_pattern.fullmatch(code): @@ -80,40 +80,33 @@ def fix_spec(src: str, path: str | os.PathLike): del responses["4XX-5XX"] schemas = spec["components"]["schemas"] - if ( - downloads := schemas.get("jobsSingleGetResponse", {}) - .get("properties", {}) - .get("data", {}) - .get("properties", {}) - .get("links", {}) - .get("properties", {}) - .get("downloads") + if downloads := schemas._get_multi( + "jobsSingleGetResponse", {}, page_size=100, page_number=1 ): - if "items" not in downloads and downloads.get("type") == "array": + if ( + "items" not in downloads + and downloads._get_multi("type", page_size=100, page_number=1) + == "array" + ): downloads["items"] = {"type": "string"} - if ( - downloads := schemas.get("jobsSinglePostResponse", {}) - .get("properties", {}) - .get("data", {}) - .get("properties", {}) - .get("links", {}) - .get("properties", {}) - .get("downloads") + if downloads := schemas._get_multi( + "jobsSinglePostResponse", {}, page_size=100, page_number=1 ): - if "items" not in downloads and downloads.get("type") == "array": + if ( + "items" not in downloads + and downloads._get_multi("type", page_size=100, page_number=1) + == "array" + ): downloads["items"] = {"type": "string"} - if ( - error_source := schemas.get("errors", {}) - .get("properties", {}) - .get("errors", {}) - .get("items", {}) - .get("properties", {}) - .get("source") + if error_source := schemas._get_multi( + "errors", {}, page_size=100, page_number=1 ): error_source["nullable"] = True - if resource := error_source.get("properties", {}).get("resource"): + if resource := error_source._get_multi( + "properties", {}, page_size=100, page_number=1 + )._get_multi("resource", page_size=100, page_number=1): resource["nullable"] = True with tempfile.NamedTemporaryFile("w", delete=False) as f: diff --git a/polarion_rest_api_client/__init__.py b/polarion_rest_api_client/__init__.py index 9e1a98a9..ba4c206a 100644 --- a/polarion_rest_api_client/__init__.py +++ b/polarion_rest_api_client/__init__.py @@ -9,7 +9,6 @@ __version__ = "0.0.0+unknown" del metadata -from polarion_rest_api_client.client import OpenAPIPolarionProjectClient from polarion_rest_api_client.data_models import ( SelectTestCasesBy, TestRecord, @@ -26,3 +25,4 @@ PolarionApiUnexpectedException, PolarionWorkItemException, ) +from polarion_rest_api_client.old_client import OpenAPIPolarionProjectClient diff --git a/polarion_rest_api_client/base_client.py b/polarion_rest_api_client/base_client.py deleted file mode 100644 index d944ca6a..00000000 --- a/polarion_rest_api_client/base_client.py +++ /dev/null @@ -1,88 +0,0 @@ -# Copyright DB InfraGO AG and contributors -# SPDX-License-Identifier: Apache-2.0 -"""Base class for a polarion project client to easily rewrite the client.""" -from __future__ import annotations - -import abc -import typing as t - -from polarion_rest_api_client import data_models as dm - -WorkItemType = t.TypeVar("WorkItemType", bound=dm.WorkItem) - - -class DefaultFields: - """A class to define default values for the fields parameter.""" - - _workitems: str = "@basic" - _linkedworkitems: str = "id,role,suspect" - _workitem_attachments: str = "@basic" - _documents: str = "@basic" - _testrecords: str = "@basic" - _testruns: str = "@basic" - - @property - def workitems(self): - """Return the fields dict for workitems.""" - return {"workitems": self._workitems} - - @workitems.setter - def workitems(self, value): - self._workitems = value - - @property - def linkedworkitems(self): - """Return the fields dict for linkedworkitems.""" - return {"linkedworkitems": self._linkedworkitems} - - @linkedworkitems.setter - def linkedworkitems(self, value): - self._linkedworkitems = value - - @property - def workitem_attachments(self): - """Return the fields dict for workitem_attachments.""" - return {"workitem_attachments": self._workitem_attachments} - - @workitem_attachments.setter - def workitem_attachments(self, value): - self._workitem_attachments = value - - @property - def documents(self): - """Return the fields dict for document.""" - return {"documents": self._documents} - - @documents.setter - def documents(self, value): - self._documents = value - - @property - def testruns(self): - """Return the fields dict for document.""" - return {"testruns": self._testruns} - - @testruns.setter - def testruns(self, value): - self._testruns = value - - @property - def testrecords(self): - """Return the fields dict for document.""" - return {"testrecords": self._testrecords} - - @testrecords.setter - def testrecords(self, value): - self._testrecords = value - - @property - def all_types(self): - """Return all fields dicts merged together.""" - return ( - self.workitem_attachments - | self.workitems - | self.linkedworkitems - | self.documents - | self.testruns - | self.testrecords - ) diff --git a/polarion_rest_api_client/client.py b/polarion_rest_api_client/client.py index 0798cf73..03239433 100644 --- a/polarion_rest_api_client/client.py +++ b/polarion_rest_api_client/client.py @@ -1,1583 +1,133 @@ # Copyright DB InfraGO AG and contributors # SPDX-License-Identifier: Apache-2.0 -"""The actual implementation of the API client using an OpenAPIClient.""" +"""Base class for a polarion project client to easily rewrite the client.""" from __future__ import annotations -import datetime -import io -import json -import logging -import os -import random -import time import typing as t -import urllib.parse -from polarion_rest_api_client import base_client +import polarion_rest_api_client.open_api_client as oa_client from polarion_rest_api_client import data_models as dm -from polarion_rest_api_client import errors -from polarion_rest_api_client.base_client import WorkItemType -from polarion_rest_api_client.open_api_client import client as oa_client -from polarion_rest_api_client.open_api_client import models as api_models -from polarion_rest_api_client.open_api_client import types as oa_types -from polarion_rest_api_client.open_api_client.api.documents import get_document -from polarion_rest_api_client.open_api_client.api.linked_work_items import ( - delete_linked_work_items, - get_linked_work_items, - post_linked_work_items, -) -from polarion_rest_api_client.open_api_client.api.projects import get_project -from polarion_rest_api_client.open_api_client.api.test_records import ( - get_test_records, - patch_test_record, - post_test_records, -) -from polarion_rest_api_client.open_api_client.api.test_runs import ( - get_test_runs, - patch_test_run, - post_test_runs, -) -from polarion_rest_api_client.open_api_client.api.work_item_attachments import ( # pylint: disable=line-too-long - delete_work_item_attachment, - get_work_item_attachments, - patch_work_item_attachment, - post_work_item_attachments, -) -from polarion_rest_api_client.open_api_client.api.work_items import ( - delete_work_items, - get_work_item, - get_work_items, - patch_work_item, - post_work_items, -) - -logger = logging.getLogger(__name__) -T = t.TypeVar("T", str, int, float, datetime.datetime, bool, None) - - -def _get_json_content_size(data: dict): - return len(json.dumps(data).encode("utf-8")) - - -min_wi_request_size = _get_json_content_size( - api_models.WorkitemsListPostRequest(data=[]).to_dict() -) - - -def _build_sparse_fields( - fields_dict: dict[str, str] -) -> api_models.SparseFields | oa_types.Unset: - """Build the SparseFields object based on a dict. - - Ensure that every key follow the pattern 'fields[XXX]'. - """ - new_field_dict: dict[str, str] = {} - for key, value in fields_dict.items(): - if key.startswith("fields["): - new_field_dict[key] = value - else: - new_field_dict[f"fields[{key}]"] = value - return api_models.SparseFields.from_dict(new_field_dict) - - -@t.overload -def unset_to_none(value: oa_types.Unset) -> None: - """Return None if value is Unset, else the value.""" - - -@t.overload -def unset_to_none(value: T) -> T: - """Return None if value is Unset, else the value.""" - - -def unset_to_none(value: t.Any) -> t.Any: - """Return None if value is Unset, else the value.""" - if isinstance(value, oa_types.Unset): - return None - return value - - -def sleep_random_time(_min: int = 5, _max: int = 15): - """Sleep for _min-_max seconds with defaults of 5-15 seconds.""" - time.sleep(random.uniform(_min, _max)) - - -class OpenAPIPolarionProjectClient(t.Generic[WorkItemType]): - """A Polarion Project Client using an auto generated OpenAPI-Client.""" - - _batch_size: int = 5 - _page_size: int = 100 - delete_status: str = "deleted" - client: oa_client.AuthenticatedClient - - @t.overload - def __init__( - self: "OpenAPIPolarionProjectClient[base_client.WorkItemType]", - project_id: str, - delete_polarion_work_items: bool, - polarion_api_endpoint: str, - polarion_access_token: str, - *, - custom_work_item: type[base_client.WorkItemType], - batch_size: int = ..., - page_size: int = ..., - add_work_item_checksum: bool = False, - max_content_size: int = ..., - httpx_args: t.Optional[dict[str, t.Any]] = ..., - ): ... +from polarion_rest_api_client.clients import projects + +WorkItemType = t.TypeVar("WorkItemType", bound=dm.WorkItem) + + +class DefaultFields: + """A class to define default values for the fields parameter.""" + + _workitems: str = "@basic" + _linkedworkitems: str = "id,role,suspect" + _workitem_attachments: str = "@basic" + _documents: str = "@basic" + _testrecords: str = "@basic" + _testruns: str = "@basic" + + @property + def workitems(self): + """Return the fields dict for workitems.""" + return {"workitems": self._workitems} + + @workitems.setter + def workitems(self, value): + self._workitems = value + + @property + def linkedworkitems(self): + """Return the fields dict for linkedworkitems.""" + return {"linkedworkitems": self._linkedworkitems} + + @linkedworkitems.setter + def linkedworkitems(self, value): + self._linkedworkitems = value + + @property + def workitem_attachments(self): + """Return the fields dict for workitem_attachments.""" + return {"workitem_attachments": self._workitem_attachments} + + @workitem_attachments.setter + def workitem_attachments(self, value): + self._workitem_attachments = value + + @property + def documents(self): + """Return the fields dict for document.""" + return {"documents": self._documents} + + @documents.setter + def documents(self, value): + self._documents = value + + @property + def testruns(self): + """Return the fields dict for document.""" + return {"testruns": self._testruns} + + @testruns.setter + def testruns(self, value): + self._testruns = value + + @property + def testrecords(self): + """Return the fields dict for document.""" + return {"testrecords": self._testrecords} + + @testrecords.setter + def testrecords(self, value): + self._testrecords = value + + @property + def all_types(self): + """Return all fields dicts merged together.""" + return ( + self.workitem_attachments + | self.workitems + | self.linkedworkitems + | self.documents + | self.testruns + | self.testrecords + ) - @t.overload - def __init__( - self: "OpenAPIPolarionProjectClient[dm.WorkItem]", - project_id: str, - delete_polarion_work_items: bool, - polarion_api_endpoint: str, - polarion_access_token: str, - *, - batch_size: int = ..., - page_size: int = ..., - add_work_item_checksum: bool = False, - max_content_size: int = ..., - httpx_args: t.Optional[dict[str, t.Any]] = ..., - ): ... +class PolarionClient: def __init__( self, - project_id: str, - delete_polarion_work_items: bool, polarion_api_endpoint: str, polarion_access_token: str, - *, - custom_work_item=dm.WorkItem, + httpx_args: t.Optional[dict[str, t.Any]] = ..., batch_size: int = 100, page_size: int = 100, - add_work_item_checksum: bool = False, max_content_size: int = 2 * 1024**2, - httpx_args: t.Optional[dict[str, t.Any]] = None, ): - """Initialize the client for project and endpoint using a token. - - Parameters - ---------- - project_id : str - ID of the project to create a client for. - delete_polarion_work_items : bool - Flag indicating whether to delete work items or set a status. - polarion_api_endpoint : str - The URL of the Polarion API endpoint. - polarion_access_token : str - A personal access token to access the API. - custom_work_item : default dm.WorkItem - Custom WorkItem class with additional attributes. - batch_size : int, default 100 - Maximum amount of items created in one POST request. - page_size : int, default 100 - Default size of a page when getting items from the API. - add_work_item_checksum : bool, default False - Flag whether post WorkItem checksums. - max_content_size : int, default 2 * 1024**2 - Maximum content-length of the API (default: 2MB). - httpx_args: t.Optional[dict[str, t.Any]], default None - Additional parameters, which will be passed to the httpx client. - """ - self.project_id = project_id - self.delete_polarion_work_items = delete_polarion_work_items - self.default_fields = base_client.DefaultFields() - self._batch_size = batch_size - self._page_size = page_size - self._work_item = custom_work_item - self.add_work_item_checksum = add_work_item_checksum - - if httpx_args is None: - httpx_args = {} - - if "proxies" not in httpx_args: - httpx_args["proxies"] = os.getenv("PROXIES") - self.client = oa_client.AuthenticatedClient( - polarion_api_endpoint, polarion_access_token, httpx_args=httpx_args - ) - self._batch_size = batch_size - self._page_size = page_size - self._max_content_size = max_content_size - - def _check_response( - self, response: oa_types.Response, _raise: bool - ) -> bool: - def unexpected_error(): - return errors.PolarionApiUnexpectedException( - response.status_code, response.content - ) - - if response.status_code not in range(400, 600): - return True - - if not _raise: - logger.warning( - "Received error response code %d with content %s.", - response.status_code, - response.content, - ) - return False - - if ( - isinstance(response.parsed, api_models.Errors) - and response.parsed.errors - ): - raise errors.PolarionApiException( - *[ - ( - e.status, - e.detail, - ( - e.source.pointer - if not ( - isinstance(e.source, oa_types.Unset) - or e.source is None - ) - else "No error pointer" - ), - ) - for e in response.parsed.errors - ] - ) - raise unexpected_error() - - def _build_work_item_post_request( - self, work_item: base_client.WorkItemType - ) -> api_models.WorkitemsListPostRequestDataItem: - assert work_item.type is not None - assert work_item.title is not None - assert work_item.description is not None - assert work_item.status is not None - - attrs = api_models.WorkitemsListPostRequestDataItemAttributes( - type=work_item.type, - description=api_models.WorkitemsListPostRequestDataItemAttributesDescription( # pylint: disable=line-too-long - type=api_models.WorkitemsListPostRequestDataItemAttributesDescriptionType( # pylint: disable=line-too-long - work_item.description_type - ), - value=work_item.description, - ), - status=work_item.status, - title=work_item.title, - ) - - attrs.additional_properties.update(work_item.additional_attributes) - - if self.add_work_item_checksum: - attrs.additional_properties["checksum"] = ( - work_item.calculate_checksum() - ) - - return api_models.WorkitemsListPostRequestDataItem( - type=api_models.WorkitemsListPostRequestDataItemType.WORKITEMS, - attributes=attrs, - ) - - def _build_work_item_patch_request( - self, work_item: base_client.WorkItemType - ) -> api_models.WorkitemsSinglePatchRequest: - attrs = api_models.WorkitemsSinglePatchRequestDataAttributes() - - if work_item.title is not None: - attrs.title = work_item.title - - if work_item.description is not None: - attrs.description = api_models.WorkitemsSinglePatchRequestDataAttributesDescription( # pylint: disable=line-too-long - type=api_models.WorkitemsSinglePatchRequestDataAttributesDescriptionType( # pylint: disable=line-too-long - work_item.description_type - ), - value=work_item.description, - ) - - if work_item.status is not None: - attrs.status = work_item.status - - attrs.additional_properties.update(work_item.additional_attributes) - - if self.add_work_item_checksum: - attrs.additional_properties["checksum"] = ( - work_item.get_current_checksum() - ) - - return api_models.WorkitemsSinglePatchRequest( - data=api_models.WorkitemsSinglePatchRequestData( - type=api_models.WorkitemsSinglePatchRequestDataType.WORKITEMS, - id=f"{self.project_id}/{work_item.id}", - attributes=attrs, - ) - ) - - def _post_work_item_batch( - self, - work_item_batch: api_models.WorkitemsListPostRequest, - work_item_objs: list[base_client.WorkItemType], - retry: bool = True, - ): - response = post_work_items.sync_detailed( - self.project_id, client=self.client, body=work_item_batch - ) - if not self._check_response(response, not retry) and retry: - sleep_random_time() - self._post_work_item_batch(work_item_batch, work_item_objs, False) - return - - assert ( - isinstance(response.parsed, api_models.WorkitemsListPostResponse) - and response.parsed.data - ) - counter = 0 - for work_item_res in response.parsed.data: - assert work_item_res.id - work_item_objs[counter].id = work_item_res.id.split("/")[-1] - counter += 1 - - def _calculate_post_work_item_request_sizes( - self, - work_item_data: api_models.WorkitemsListPostRequestDataItem, - current_content_size: int = min_wi_request_size, - ) -> t.Tuple[int, bool]: - work_item_size = _get_json_content_size(work_item_data.to_dict()) - - proj_content_size = current_content_size + work_item_size - if current_content_size != min_wi_request_size: - proj_content_size += len(b", ") - - return ( - proj_content_size, - (work_item_size + min_wi_request_size) > self._max_content_size, - ) - - def project_exists(self) -> bool: - """Return True if self.project_id exists and False if not.""" - response = get_project.sync_detailed( - self.project_id, client=self.client - ) - if not response.status_code == 200: - logger.error("Polarion request: %s", response.content) - return False - return True - - def get_work_item_attachments( - self, - work_item_id: str, - fields: dict[str, str] | None = None, - page_size: int = 100, - page_number: int = 1, - retry: bool = True, - ) -> tuple[list[dm.WorkItemAttachment], bool]: - """Return the attachments for a given work item on a defined page. - - In addition, a flag whether a next page is available is - returned. Define a fields dictionary as described in the - Polarion API documentation to get certain fields. - """ - if fields is None: - fields = self.default_fields.workitem_attachments - - sparse_fields = _build_sparse_fields(fields) - response = get_work_item_attachments.sync_detailed( - self.project_id, - work_item_id=work_item_id, - client=self.client, - fields=sparse_fields, - pagesize=page_size, - pagenumber=page_number, - ) - - if not self._check_response(response, not retry) and retry: - sleep_random_time() - return self.get_work_item_attachments( - work_item_id, fields, page_size, page_number, False - ) - - parsed_response = response.parsed - - work_item_attachments: list[dm.WorkItemAttachment] = [] - - next_page = False - - if ( - isinstance( - parsed_response, api_models.WorkitemAttachmentsListGetResponse - ) - and parsed_response.data - ): - for attachment in parsed_response.data: - assert attachment.attributes - assert isinstance(attachment.attributes.id, str) - - work_item_attachments.append( - dm.WorkItemAttachment( - work_item_id, - attachment.attributes.id, - unset_to_none(attachment.attributes.title), - file_name=unset_to_none( - attachment.attributes.file_name - ), - ) - ) - - next_page = isinstance( - parsed_response.links, - api_models.WorkitemAttachmentsListGetResponseLinks, - ) and bool(parsed_response.links.next_) - - return work_item_attachments, next_page - - def delete_work_item_attachment( - self, work_item_attachment: dm.WorkItemAttachment, retry: bool = True - ): - """Delete the given work item attachment.""" - response = delete_work_item_attachment.sync_detailed( - self.project_id, - work_item_attachment.work_item_id, - work_item_attachment.id, - client=self.client, - ) - if not self._check_response(response, not retry) and retry: - sleep_random_time() - self.delete_work_item_attachment(work_item_attachment, False) - - def update_work_item_attachment( - self, work_item_attachment: dm.WorkItemAttachment, retry: bool = True - ): - """Update the given work item attachment in Polarion.""" - attributes = ( - api_models.WorkitemAttachmentsSinglePatchRequestDataAttributes() - ) - if work_item_attachment.title: - attributes.title = work_item_attachment.title - - multipart = api_models.PatchWorkItemAttachmentsRequestBody( - resource=api_models.WorkitemAttachmentsSinglePatchRequest( - data=api_models.WorkitemAttachmentsSinglePatchRequestData( - type=api_models.WorkitemAttachmentsSinglePatchRequestDataType.WORKITEM_ATTACHMENTS, # pylint: disable=line-too-long - id=f"{self.project_id}/{work_item_attachment.work_item_id}/{work_item_attachment.id}", # pylint: disable=line-too-long - attributes=attributes, - ) - ) - ) - - if work_item_attachment.content_bytes: - multipart.content = oa_types.File( - io.BytesIO(work_item_attachment.content_bytes), - work_item_attachment.file_name, - work_item_attachment.mime_type, - ) - - response = patch_work_item_attachment.sync_detailed( - self.project_id, - work_item_attachment.work_item_id, - work_item_attachment.id, - client=self.client, - body=multipart, - ) - if not self._check_response(response, not retry) and retry: - sleep_random_time() - self.update_work_item_attachment(work_item_attachment, False) - - def create_work_item_attachments( - self, - work_item_attachments: list[dm.WorkItemAttachment], - retry: bool = True, - ): - """Create the given work item attachment in Polarion.""" - attachment_attributes = [] - attachment_files = [] - assert len(work_item_attachments), "No attachments were provided." - assert all( - [wia.work_item_id == work_item_attachments[0].work_item_id] - for wia in work_item_attachments - ), "All attachments must belong to the same WorkItem." - - for work_item_attachment in work_item_attachments: - assert ( - work_item_attachment.file_name - ), "You have to define a FileName." - assert ( - work_item_attachment.content_bytes - ), "You have to provide content bytes." - assert ( - work_item_attachment.mime_type - ), "You have to provide a mime_type." - - attributes = api_models.WorkitemAttachmentsListPostRequestDataItemAttributes( # pylint: disable=line-too-long - file_name=work_item_attachment.file_name - ) - if work_item_attachment.title: - attributes.title = work_item_attachment.title - - attachment_attributes.append( - api_models.WorkitemAttachmentsListPostRequestDataItem( - type=api_models.WorkitemAttachmentsListPostRequestDataItemType.WORKITEM_ATTACHMENTS, # pylint: disable=line-too-long - attributes=attributes, - ) - ) - - attachment_files.append( - oa_types.File( - io.BytesIO(work_item_attachment.content_bytes), - work_item_attachment.file_name, - work_item_attachment.mime_type, - ) - ) - - multipart = api_models.PostWorkItemAttachmentsRequestBody( - resource=api_models.WorkitemAttachmentsListPostRequest( - attachment_attributes - ), - files=attachment_files, - ) - - response = post_work_item_attachments.sync_detailed( - self.project_id, - work_item_attachments[0].work_item_id, - client=self.client, - body=multipart, - ) - if not self._check_response(response, not retry) and retry: - sleep_random_time() - self.create_work_item_attachments(work_item_attachments, False) - return - - assert ( - isinstance( - response.parsed, api_models.WorkitemAttachmentsListPostResponse - ) - and response.parsed.data - ) - counter = 0 - for work_item_attachment_res in response.parsed.data: - assert work_item_attachment_res.id - work_item_attachments[counter].id = ( - work_item_attachment_res.id.split("/")[-1] - ) - counter += 1 - - def get_work_items( - self, - query: str = "", - fields: dict[str, str] | None = None, - page_size: int = 100, - page_number: int = 1, - retry: bool = True, - ) -> tuple[list[base_client.WorkItemType], bool]: - """Return the work items on a defined page matching the given query. - - In addition, a flag whether a next page is available is - returned. Define a fields dictionary as described in the - Polarion API documentation to get certain fields. - """ - if fields is None: - fields = self.default_fields.workitems - - sparse_fields = _build_sparse_fields(fields) - response = get_work_items.sync_detailed( - self.project_id, - client=self.client, - fields=sparse_fields, - query=query, - pagesize=page_size, - pagenumber=page_number, - ) - - if not self._check_response(response, not retry) and retry: - sleep_random_time() - return self.get_work_items( - query, fields, page_size, page_number, False - ) - - work_items_response = response.parsed - - work_items: list[base_client.WorkItemType] = [] - - next_page = False - if ( - isinstance( - work_items_response, api_models.WorkitemsListGetResponse - ) - and work_items_response.data - ): - work_items = [ - self._generate_work_item(work_item) - for work_item in work_items_response.data - if not getattr(work_item.meta, "errors", []) - ] - - next_page = isinstance( - work_items_response.links, - api_models.WorkitemsListGetResponseLinks, - ) and bool(work_items_response.links.next_) - - return work_items, next_page - - def get_work_item( - self, - work_item_id: str, - retry: bool = True, - ) -> base_client.WorkItemType | None: - """Return one specific work item with all fields. - - This also includes all linked work items and attachments. If - there are to many of these to get them in one request, the - truncated flags for linked_work_items and attachments will be - set to True. - """ - response = get_work_item.sync_detailed( - self.project_id, - work_item_id, - client=self.client, - fields=_build_sparse_fields( - { - "workitems": "@all", - "workitem_attachments": "@all", - "linkedworkitems": "@all", - } - ), - ) - if not self._check_response(response, not retry) and retry: - sleep_random_time() - return self.get_work_item(work_item_id, False) - - if isinstance( - response.parsed, api_models.WorkitemsSingleGetResponse - ) and isinstance( - response.parsed.data, api_models.WorkitemsSingleGetResponseData - ): - return self._generate_work_item(response.parsed.data) - - return None - - def _generate_work_item( - self, - work_item: ( - api_models.WorkitemsListGetResponseDataItem - | api_models.WorkitemsSingleGetResponseData - ), - ) -> base_client.WorkItemType: - assert work_item.attributes - assert isinstance(work_item.id, str) - work_item_id = work_item.id.split("/")[-1] - work_item_links = [] - work_item_attachments = [] - - # We set both truncated flags to True and will only set them to False, - # if the corresponding fields were requested and returned completely - links_truncated = True - attachments_truncated = True - if work_item.relationships: - if links := work_item.relationships.linked_work_items: - if not links.meta or links.meta.total_count is oa_types.UNSET: - links_truncated = False - - work_item_links = [ - self._parse_work_item_link( - link.id, - link.additional_properties.get("suspect", False), - work_item_id, - ) - for link in links.data or [] - ] - - if attachments := work_item.relationships.attachments: - if ( - not attachments.meta - or attachments.meta.total_count is oa_types.UNSET - ): - attachments_truncated = False - - work_item_attachments = [ - dm.WorkItemAttachment( - work_item_id, - attachment.id.split("/")[-1], - None, # title isn't provided - ) - for attachment in attachments.data or [] - if attachment.id - ] - - desctype = None - desc = None - if work_item.attributes.description: - desctype = unset_to_none(work_item.attributes.description.type) - desc = unset_to_none(work_item.attributes.description.value) - - work_item_obj = self._work_item( - work_item_id, - unset_to_none(work_item.attributes.title), - desctype, - desc, - unset_to_none(work_item.attributes.type), - unset_to_none(work_item.attributes.status), - work_item.attributes.additional_properties, - work_item_links, - work_item_attachments, - links_truncated, - attachments_truncated, + polarion_api_endpoint, + polarion_access_token, + httpx_args=httpx_args or {}, ) - return work_item_obj + self.batch_size = batch_size + self.page_size = page_size + self.max_content_size = max_content_size + self.default_fields = DefaultFields() - def get_document( - self, - space_id: str, - document_name: str, - fields: dict[str, str] | None = None, - include: str | None | oa_types.Unset = None, - revision: str | None | oa_types.Unset = None, - retry: bool = True, - ) -> dm.Document | None: - """Return the document with the given document_name and space_id.""" - if include is None: - include = oa_types.UNSET - - if revision is None: - revision = oa_types.UNSET - - if " " in space_id or " " in document_name: - space_id = urllib.parse.quote( - space_id, safe="/", encoding=None, errors=None - ) - document_name = urllib.parse.quote( - document_name, safe="/", encoding=None, errors=None - ) - if fields is None: - fields = self.default_fields.documents - - sparse_fields = _build_sparse_fields(fields) - response = get_document.sync_detailed( - self.project_id, - space_id, - document_name, - client=self.client, - fields=sparse_fields, - include=include, - revision=revision, - ) - - if not self._check_response(response, not retry) and retry: - sleep_random_time() - return self.get_document( - space_id, document_name, fields, include, revision, False - ) - - document_response = response.parsed - - if isinstance( - document_response, api_models.DocumentsSingleGetResponse - ) and (data := document_response.data): - if not getattr(data.meta, "errors", []): - assert (attributes := data.attributes) - assert isinstance(data.id, str) - home_page_content = self._handle_text_content( - attributes.home_page_content - ) - - return dm.Document( - id=data.id, - module_folder=unset_to_none(attributes.module_folder), - module_name=unset_to_none(attributes.module_name), - type=unset_to_none(attributes.type), - status=unset_to_none(attributes.status), - home_page_content=home_page_content, - ) - return None - - def _handle_text_content( - self, - polarion_content: ( - api_models.DocumentsSingleGetResponseDataAttributesHomePageContent # pylint: disable=line-too-long - | api_models.TestrecordsListGetResponseDataItemAttributesComment - | api_models.TestrunsListGetResponseDataItemAttributesHomePageContent - | oa_types.Unset - ), - ) -> dm.TextContent | None: - if not polarion_content: - return None - - return dm.TextContent( - type=str(polarion_content.type) if polarion_content.type else None, - value=polarion_content.value or None, - ) - - def create_work_items(self, work_items: list[base_client.WorkItemType]): - """Create the given list of work items.""" - current_batch = api_models.WorkitemsListPostRequest(data=[]) - content_size = min_wi_request_size - batch_start_index = 0 - batch_end_index = 0 - - for work_item in work_items: - work_item_data = self._build_work_item_post_request(work_item) - - ( - proj_content_size, - too_big, - ) = self._calculate_post_work_item_request_sizes( - work_item_data, content_size - ) - - if too_big: - raise errors.PolarionWorkItemException( - "A WorkItem is too large to create.", work_item - ) - - assert isinstance(current_batch.data, list) - if ( - proj_content_size >= self._max_content_size - or len(current_batch.data) >= self._batch_size - ): - self._post_work_item_batch( - current_batch, - work_items[batch_start_index:batch_end_index], - ) - - current_batch = api_models.WorkitemsListPostRequest( - data=[work_item_data] - ) - content_size = _get_json_content_size(current_batch.to_dict()) - batch_start_index = batch_end_index - else: - assert isinstance(current_batch.data, list) - current_batch.data.append(work_item_data) - content_size = proj_content_size - - batch_end_index += 1 - - if current_batch.data: - self._post_work_item_batch( - current_batch, work_items[batch_start_index:] - ) - - def _delete_work_items(self, work_item_ids: list[str], retry: bool = True): - response = delete_work_items.sync_detailed( - self.project_id, - client=self.client, - body=api_models.WorkitemsListDeleteRequest( - data=[ - api_models.WorkitemsListDeleteRequestDataItem( - type=api_models.WorkitemsListDeleteRequestDataItemType.WORKITEMS, # pylint: disable=line-too-long - id=f"{self.project_id}/{work_item_id}", - ) - for work_item_id in work_item_ids - ] - ), - ) - - if not self._check_response(response, not retry) and retry: - sleep_random_time() - self._delete_work_items(work_item_ids, False) - - def update_work_item( - self, work_item: base_client.WorkItemType, retry: bool = True - ): - """Update the given work item in Polarion. - - Only fields not set to None will be updated in Polarion. None - fields will stay untouched. - """ - assert work_item.id is not None - if work_item.type: - logger.warning( - "Attempting to change the type of Work Item %s to %s.", - work_item.id, - work_item.type, - ) - - response = patch_work_item.sync_detailed( - self.project_id, - work_item.id, - client=self.client, - change_type_to=work_item.type or oa_types.UNSET, - body=self._build_work_item_patch_request(work_item), - ) - - if not self._check_response(response, not retry) and retry: - sleep_random_time() - self.update_work_item(work_item, False) - - def get_work_item_links( - self, - work_item_id: str, - fields: dict[str, str] | None = None, - include: str | None | oa_types.Unset = None, - page_size: int = 100, - page_number: int = 1, - retry: bool = True, - ) -> tuple[list[dm.WorkItemLink], bool]: - """Get the work item links for the given work item on a page. - - In addition, a flag whether a next page is available is - returned. Define a fields dictionary as described in the - Polarion API documentation to get certain fields. - """ - if fields is None: - fields = self.default_fields.linkedworkitems - - if include is None: - include = oa_types.UNSET - - sparse_fields = _build_sparse_fields(fields) - response = get_linked_work_items.sync_detailed( - self.project_id, - work_item_id, - client=self.client, - fields=sparse_fields, - include=include, - pagesize=page_size, - pagenumber=page_number, - ) - - if not self._check_response(response, not retry) and retry: - sleep_random_time() - return self.get_work_item_links( - work_item_id, fields, include, page_size, page_number, False - ) - - linked_work_item_response = response.parsed - - work_item_links: list[dm.WorkItemLink] = [] - next_page = False - if ( - isinstance( - linked_work_item_response, - api_models.LinkedworkitemsListGetResponse, - ) - and linked_work_item_response.data - ): - for link in linked_work_item_response.data: - assert isinstance(link.id, str) - assert isinstance( - link.attributes, - api_models.LinkedworkitemsListGetResponseDataItemAttributes, # pylint: disable=line-too-long - ) - - work_item_links.append( - self._parse_work_item_link( - link.id, link.attributes.suspect, work_item_id - ) - ) - - next_page = isinstance( - linked_work_item_response.links, - api_models.LinkedworkitemsListGetResponseLinks, - ) and bool(linked_work_item_response.links.next_) - - return work_item_links, next_page - - def _parse_work_item_link(self, link_id, suspect, work_item_id): - info = link_id.split("/") - assert len(info) == 5 - role_id, target_project_id, linked_work_item_id = info[2:] - if isinstance(suspect, oa_types.Unset): - suspect = False - work_item_link = dm.WorkItemLink( - work_item_id, - linked_work_item_id, - role_id, - suspect, - target_project_id, - ) - return work_item_link - - def _create_work_item_links( - self, work_item_links: list[dm.WorkItemLink], retry: bool = True - ): - response = post_linked_work_items.sync_detailed( - self.project_id, - work_item_links[0].primary_work_item_id, - client=self.client, - # pylint: disable=line-too-long - body=api_models.LinkedworkitemsListPostRequest( - data=[ - api_models.LinkedworkitemsListPostRequestDataItem( - type=api_models.LinkedworkitemsListPostRequestDataItemType.LINKEDWORKITEMS, - attributes=api_models.LinkedworkitemsListPostRequestDataItemAttributes( - role=work_item_link.role, - suspect=work_item_link.suspect or False, - ), - relationships=api_models.LinkedworkitemsListPostRequestDataItemRelationships( - work_item=api_models.LinkedworkitemsListPostRequestDataItemRelationshipsWorkItem( - data=api_models.LinkedworkitemsListPostRequestDataItemRelationshipsWorkItemData( - type=api_models.LinkedworkitemsListPostRequestDataItemRelationshipsWorkItemDataType.WORKITEMS, - id=f"{work_item_link.secondary_work_item_project}/{work_item_link.secondary_work_item_id}", - ) - ) - ), - ) - for work_item_link in work_item_links - ] - ), - # pylint: enable=line-too-long - ) - - if not self._check_response(response, not retry) and retry: - sleep_random_time() - self._create_work_item_links(work_item_links, False) - - def _delete_work_item_links( - self, work_item_links: list[dm.WorkItemLink], retry: bool = True - ): - response = delete_linked_work_items.sync_detailed( - self.project_id, - work_item_links[0].primary_work_item_id, - client=self.client, - # pylint: disable=line-too-long - body=api_models.LinkedworkitemsListDeleteRequest( - data=[ - api_models.LinkedworkitemsListDeleteRequestDataItem( - type=api_models.LinkedworkitemsListDeleteRequestDataItemType.LINKEDWORKITEMS, - id=f"{self.project_id}/{work_item_link.primary_work_item_id}/{work_item_link.role}/{work_item_link.secondary_work_item_project}/{work_item_link.secondary_work_item_id}", - ) - for work_item_link in work_item_links - ] - ), - # pylint: enable=line-too-long - ) - - if not self._check_response(response, not retry) and retry: - sleep_random_time() - self._delete_work_item_links(work_item_links, False) - - def get_test_records( - self, - test_run_id: str, - fields: dict[str, str] | None = None, - page_size: int = 100, - page_number: int = 1, - retry: bool = True, - ) -> tuple[list[dm.TestRecord], bool]: - """Return the test records on a defined page matching the given query. - - In addition, a flag whether a next page is available is - returned. Define a fields dictionary as described in the - Polarion API documentation to get certain fields. - """ - if fields is None: - fields = self.default_fields.testrecords - - sparse_fields = _build_sparse_fields(fields) - response = get_test_records.sync_detailed( - self.project_id, - test_run_id, - client=self.client, - fields=sparse_fields, - pagenumber=page_number, - pagesize=page_size, - ) - - if not self._check_response(response, not retry) and retry: - sleep_random_time() - return self.get_test_records( - test_run_id, fields, page_size, page_number, False - ) - - parsed_response = response.parsed - assert isinstance( - parsed_response, api_models.TestrecordsListGetResponse - ) - - test_records = [] - - for data in parsed_response.data or []: - assert isinstance(data.id, str) - assert isinstance( - data.attributes, - api_models.TestrecordsListGetResponseDataItemAttributes, - ) - _, _, project_id, work_item, iteration = data.id.split("/") - test_records.append( - dm.TestRecord( - project_id, - work_item, - unset_to_none(data.attributes.test_case_revision), - int(iteration), - ( - data.attributes.duration - if not isinstance( - data.attributes.duration, oa_types.Unset - ) - else -1 - ), - unset_to_none(data.attributes.result), - self._handle_text_content(data.attributes.comment), - unset_to_none(data.attributes.executed), - data.additional_properties or {}, - ) - ) - next_page = isinstance( - parsed_response.links, - api_models.TestrecordsListGetResponseLinks, - ) and bool(parsed_response.links.next_) - - return test_records, next_page - - def get_test_runs( - self, - query: str = "", - fields: dict[str, str] | None = None, - page_size: int = 100, - page_number: int = 1, - retry: bool = True, - ) -> tuple[list[dm.TestRun], bool]: - """Return the test runs on a defined page matching the given query. - - In addition, a flag whether a next page is available is - returned. Define a fields dictionary as described in the - Polarion API documentation to get certain fields. - """ - if fields is None: - fields = self.default_fields.testruns - - sparse_fields = _build_sparse_fields(fields) - response = get_test_runs.sync_detailed( - self.project_id, - client=self.client, - query=query, - fields=sparse_fields, - pagenumber=page_number, - pagesize=page_size, - ) - - if not self._check_response(response, not retry) and retry: - sleep_random_time() - return self.get_test_runs( - query, fields, page_size, page_number, False - ) - - parsed_response = response.parsed - assert isinstance(parsed_response, api_models.TestrunsListGetResponse) - - test_runs = [] - for data in parsed_response.data or []: - assert isinstance(data.id, str) - assert isinstance( - data.attributes, - api_models.TestrunsListGetResponseDataItemAttributes, - ) - test_runs.append( - dm.TestRun( - data.id.split("/")[-1], - unset_to_none(data.attributes.type), - unset_to_none(data.attributes.status), - unset_to_none(data.attributes.title), - self._handle_text_content( - data.attributes.home_page_content - ), - unset_to_none(data.attributes.finished_on), - unset_to_none(data.attributes.group_id), - unset_to_none(data.attributes.id_prefix), - unset_to_none(data.attributes.is_template), - unset_to_none(data.attributes.keep_in_history), - unset_to_none(data.attributes.query), - unset_to_none(data.attributes.keep_in_history), - ( - dm.SelectTestCasesBy( - str(data.attributes.select_test_cases_by) - ) - if data.attributes.select_test_cases_by - else None - ), - data.attributes.additional_properties or {}, - ) - ) - next_page = isinstance( - parsed_response.links, - api_models.TestrunsListGetResponseLinks, - ) and bool(parsed_response.links.next_) - - return test_runs, next_page - - def create_test_runs( - self, test_runs: list[dm.TestRun], retry: bool = True - ): - """Create the given list of test runs.""" - polarion_test_runs = [ - api_models.TestrunsListPostRequestDataItem( - type=api_models.TestrunsListPostRequestDataItemType.TESTRUNS, - attributes=self._fill_test_run_attributes( - api_models.TestrunsListPostRequestDataItemAttributes, - test_run, - ), - ) - for test_run in test_runs - ] - - response = post_test_runs.sync_detailed( - self.project_id, - client=self.client, - body=api_models.TestrunsListPostRequest(polarion_test_runs), - ) - - if not self._check_response(response, not retry) and retry: - sleep_random_time() - self.create_test_runs(test_runs, False) - - def update_test_run(self, test_run: dm.TestRun, retry: bool = True): - """Create the given list of test runs.""" - assert test_run.id - response = patch_test_run.sync_detailed( - self.project_id, - test_run.id, - client=self.client, - body=api_models.TestrunsSinglePatchRequest( - data=api_models.TestrunsSinglePatchRequestData( - type=api_models.TestrunsSinglePatchRequestDataType.TESTRUNS, # pylint: disable=line-too-long - id=f"{self.project_id}/{test_run.id}", - attributes=self._fill_test_run_attributes( - api_models.TestrunsSinglePatchRequestDataAttributes, - test_run, - ), - ) - ), - ) - - if not self._check_response(response, not retry) and retry: - sleep_random_time() - self.update_test_run(test_run, False) - - def _fill_test_run_attributes( - self, - attributes_type: type[ - api_models.TestrunsListPostRequestDataItemAttributes - | api_models.TestrunsSinglePatchRequestDataAttributes - ], - test_run: dm.TestRun, - ): - type_prefix = attributes_type.__name__ - attributes = attributes_type() - if test_run.type is not None: - attributes.type = test_run.type - if test_run.id and hasattr(attributes, "id"): - attributes.id = test_run.id - if test_run.status is not None: - attributes.status = test_run.status - if test_run.title is not None: - attributes.title = test_run.title - if test_run.finished_on is not None: - attributes.finished_on = test_run.finished_on - if test_run.group_id is not None: - attributes.group_id = test_run.group_id - if test_run.id_prefix is not None: - attributes.id_prefix = test_run.id_prefix - if test_run.is_template is not None and hasattr( - attributes, "is_template" - ): - attributes.is_template = test_run.is_template - if test_run.keep_in_history is not None: - attributes.keep_in_history = test_run.keep_in_history - if test_run.query is not None: - attributes.query = test_run.query - if test_run.use_report_from_template is not None: - attributes.use_report_from_template = ( - test_run.use_report_from_template - ) - if test_run.additional_attributes: - attributes.additional_properties = test_run.additional_attributes - if test_run.select_test_cases_by: - attributes.select_test_cases_by = getattr( - api_models, f"{type_prefix}SelectTestCasesBy" - )(test_run.select_test_cases_by.value) - if test_run.home_page_content: - attributes.home_page_content = getattr( - api_models, f"{type_prefix}HomePageContent" - )() - assert attributes.home_page_content - if test_run.home_page_content.type: - attributes.home_page_content.type = getattr( - api_models, f"{type_prefix}HomePageContentType" - )(test_run.home_page_content.type) - if test_run.home_page_content.value: - attributes.home_page_content.value = ( - test_run.home_page_content.value - ) - - return attributes - - def _fill_test_record_attributes( - self, - attributes_type: type[ - api_models.TestrecordsListPostRequestDataItemAttributes - | api_models.TestrecordsSinglePatchRequestDataAttributes - ], - test_record: dm.TestRecord, - ): - type_prefix = attributes_type.__name__ - attributes = attributes_type() - if test_record.result: - attributes.result = test_record.result - if test_record.comment: - attributes.comment = getattr(api_models, f"{type_prefix}Comment")() - assert attributes.comment - if test_record.comment.type: - attributes.comment.type = getattr( - api_models, f"{type_prefix}CommentType" - )(test_record.comment.type) - if test_record.comment.value: - attributes.comment.value = test_record.comment.value - if test_record.duration != -1: - attributes.duration = test_record.duration - if test_record.work_item_revision: - attributes.test_case_revision = test_record.work_item_revision - if test_record.executed: - attributes.executed = test_record.executed - if test_record.additional_attributes: - attributes.additional_properties = ( - test_record.additional_attributes - ) - return attributes - - def create_test_records( - self, - test_run_id: str, - test_records: list[dm.TestRecord], - retry: bool = True, - ): - """Create the given list of test records.""" - response = post_test_records.sync_detailed( - self.project_id, - test_run_id, - client=self.client, - # pylint: disable=line-too-long - body=api_models.TestrecordsListPostRequest( - [ - api_models.TestrecordsListPostRequestDataItem( - type=api_models.TestrecordsListPostRequestDataItemType.TESTRECORDS, - attributes=self._fill_test_record_attributes( - api_models.TestrecordsListPostRequestDataItemAttributes, - test_record, - ), - relationships=api_models.TestrecordsListPostRequestDataItemRelationships( - test_case=api_models.TestrecordsListPostRequestDataItemRelationshipsTestCase( - data=api_models.TestrecordsListPostRequestDataItemRelationshipsTestCaseData( - type=api_models.TestrecordsListPostRequestDataItemRelationshipsTestCaseDataType.WORKITEMS, - id=f"{test_record.work_item_project_id}/{test_record.work_item_id}", - ) - ) - ), - ) - for test_record in test_records - ] - ), - # pylint: enable=line-too-long - ) - - if not self._check_response(response, not retry) and retry: - sleep_random_time() - self.create_test_records(test_run_id, test_records, False) - - assert ( - isinstance(response.parsed, api_models.TestrecordsListPostResponse) - and response.parsed.data - ) - counter = 0 - for response_item in response.parsed.data: - if response_item.id: - test_records[counter].iteration = int( - response_item.id.split("/")[-1] - ) - counter += 1 - - def update_test_record( - self, test_run_id: str, test_record: dm.TestRecord, retry: bool = True - ): - """Create the given list of test records.""" - response = patch_test_record.sync_detailed( - self.project_id, - test_run_id, - test_record.work_item_project_id, - test_record.work_item_id, - str(test_record.iteration), - client=self.client, - # pylint: disable=line-too-long - body=api_models.TestrecordsSinglePatchRequest( - data=api_models.TestrecordsSinglePatchRequestData( - type=api_models.TestrecordsSinglePatchRequestDataType.TESTRECORDS, - id=f"{self.project_id}/{test_run_id}/{test_record.work_item_project_id}/{test_record.work_item_id}/{test_record.iteration}", - attributes=self._fill_test_record_attributes( - api_models.TestrecordsSinglePatchRequestDataAttributes, - test_record, - ), - ) - ), - # pylint: enable=line-too-long - ) - - if not self._check_response(response, not retry) and retry: - sleep_random_time() - self.update_test_record(test_run_id, test_record, False) - - def _request_all_items(self, call: t.Callable, **kwargs) -> list[t.Any]: + def request_all_items(self, call: t.Callable, **kwargs) -> list[t.Any]: page = 1 items, next_page = call( - **kwargs, page_size=self._page_size, page_number=page + **kwargs, page_size=self.page_size, page_number=page ) while next_page: page += 1 _items, next_page = call( - **kwargs, page_size=self._page_size, page_number=page + **kwargs, page_size=self.page_size, page_number=page ) items += _items return items - def get_all_work_item_attachments( - self, work_item_id: str, fields: dict[str, str] | None = None - ) -> list[dm.WorkItemAttachment]: - """Get all work item attachments for a given work item. - - Will handle pagination automatically. Define a fields dictionary - as described in the Polarion API documentation to get certain - fields. - """ - return self._request_all_items( - self.get_work_item_attachments, - fields=fields, - work_item_id=work_item_id, - ) - - def create_work_item_attachment( - self, work_item_attachment: dm.WorkItemAttachment, retry: bool = True - ): - """Update the given work item attachment in Polarion.""" - self.create_work_item_attachments([work_item_attachment], retry) - - def get_all_work_items( - self, query: str = "", fields: dict[str, str] | None = None - ) -> list[WorkItemType]: - """Get all work items matching the given query. - - Will handle pagination automatically. Define a fields dictionary - as described in the Polarion API documentation to get certain - fields. - """ - return self._request_all_items( - self.get_work_items, fields=fields, query=query - ) - - def create_work_item(self, work_item: WorkItemType): - """Create a single given work item.""" - self.create_work_items([work_item]) - - def delete_work_items(self, work_item_ids: list[str]): - """Delete or mark the defined work items as deleted.""" - if self.delete_polarion_work_items: - return self._delete_work_items(work_item_ids) - return self._mark_delete_work_items(work_item_ids) - - def delete_work_item(self, work_item_id: str): - """Delete or mark the defined work item as deleted.""" - return self.delete_work_items([work_item_id]) - - def _mark_delete_work_items(self, work_item_ids: list[str]): - """Set the status for all given work items to self.delete_status.""" - for work_item_id in work_item_ids: - self.update_work_item( - self._work_item(id=work_item_id, status=self.delete_status) - ) - - def get_all_work_item_links( + def get_project_client( self, - work_item_id: str, - fields: dict[str, str] | None = None, - include: str | None = None, - ) -> list[dm.WorkItemLink]: - """Get all work item links for the given work item. - - Define a fields dictionary as described in the Polarion API - documentation to get certain fields. - """ - return self._request_all_items( - self.get_work_item_links, - work_item_id=work_item_id, - fields=fields, - include=include, - ) - - def create_work_item_links(self, work_item_links: list[dm.WorkItemLink]): - """Create the links between the work items in work_item_links.""" - for split_work_item_links in self._group_links( - work_item_links - ).values(): - for i in range(0, len(split_work_item_links), self._batch_size): - self._create_work_item_links( - split_work_item_links[i : i + self._batch_size] - ) - - def _set_project(self, work_item_link: dm.WorkItemLink): - if work_item_link.secondary_work_item_project is None: - work_item_link.secondary_work_item_project = self.project_id - - def _group_links( - self, - work_item_links: list[dm.WorkItemLink], - ) -> dict[str, list[dm.WorkItemLink]]: - """Group a list of work item links by their primary work item. - - Returns a dict with the primary work items as keys. - """ - work_item_link_dict: dict[str, list[dm.WorkItemLink]] = {} - for work_item_link in work_item_links: - self._set_project(work_item_link) - if work_item_link.primary_work_item_id not in work_item_link_dict: - work_item_link_dict[work_item_link.primary_work_item_id] = [] - - work_item_link_dict[work_item_link.primary_work_item_id].append( - work_item_link - ) - return work_item_link_dict - - def create_work_item_link(self, work_item_link: dm.WorkItemLink): - """Create the link between the work items in work_item_link.""" - self._set_project(work_item_link) - self._create_work_item_links([work_item_link]) - - def delete_work_item_links(self, work_item_links: list[dm.WorkItemLink]): - """Delete the links between the work items in work_item_link.""" - for split_work_item_links in self._group_links( - work_item_links - ).values(): - self._delete_work_item_links(split_work_item_links) - - def delete_work_item_link(self, work_item_link: dm.WorkItemLink): - """Delete the links between the work items in work_item_link.""" - self._set_project(work_item_link) - self._delete_work_item_links([work_item_link]) - - def get_all_test_runs( - self, - query: str = "", - fields: dict[str, str] | None = None, - ) -> list[dm.TestRun]: - """Get all test runs matching the given query. - - Will handle pagination automatically. Define a fields dictionary - as described in the Polarion API documentation to get certain - fields. - """ - return self._request_all_items( - self.get_test_runs, fields=fields, query=query - ) - - def get_all_test_records( - self, - test_run_id: str, - fields: dict[str, str] | None = None, - ) -> list[dm.TestRecord]: - """Get all test records matching the given query. - - Will handle pagination automatically. Define a fields dictionary - as described in the Polarion API documentation to get certain - fields. - """ - return self._request_all_items( - self.get_test_records, fields=fields, test_run_id=test_run_id + project_id: str, + delete_status: str | None = None, + add_work_item_checksum: bool = False, + ): + return projects.ProjectClient( + project_id, self, delete_status, add_work_item_checksum ) - - def create_test_run(self, test_run: dm.TestRun): - """Create the given test run.""" - self.create_test_runs([test_run]) - - def create_test_record(self, test_run_id: str, test_record: dm.TestRecord): - """Create the given list of test records.""" - self.create_test_records(test_run_id, [test_record]) diff --git a/polarion_rest_api_client/clients/base_classes.py b/polarion_rest_api_client/clients/base_classes.py new file mode 100644 index 00000000..6f950446 --- /dev/null +++ b/polarion_rest_api_client/clients/base_classes.py @@ -0,0 +1,214 @@ +# Copyright DB InfraGO AG and contributors +# SPDX-License-Identifier: Apache-2.0 + +import abc +import logging +import random +import time +import typing as t + +from polarion_rest_api_client import data_models as dm +from polarion_rest_api_client import errors +from polarion_rest_api_client.open_api_client import models as api_models +from polarion_rest_api_client.open_api_client import types as oa_types + +T = t.TypeVar("T") +ST = t.TypeVar("ST", bound=dm.StatusItem) +logger = logging.getLogger(__name__) +_min_sleep = 5 +_max_sleep = 15 + + +class BaseClient(abc.ABC): + def __init__( + self, project_id: str, client: "polarion_client.PolarionClient" + ): + self._project_id = project_id + self._client = client + + def _handle_text_content( + self, + polarion_content: ( + api_models.DocumentsSingleGetResponseDataAttributesHomePageContent # pylint: disable=line-too-long + | api_models.TestrecordsListGetResponseDataItemAttributesComment + | api_models.TestrunsListGetResponseDataItemAttributesHomePageContent + | oa_types.Unset + ), + ) -> dm.TextContent | None: + if not polarion_content: + return None + + return dm.TextContent( + type=str(polarion_content.type) if polarion_content.type else None, + value=polarion_content.value or None, + ) + + def _build_sparse_fields( + self, fields_dict: dict[str, str] + ) -> api_models.SparseFields | oa_types.Unset: + """Build the SparseFields object based on a dict. + + Ensure that every key follow the pattern 'fields[XXX]'. + """ + new_field_dict: dict[str, str] = {} + for key, value in fields_dict.items(): + if key.startswith("fields["): + new_field_dict[key] = value + else: + new_field_dict[f"fields[{key}]"] = value + return api_models.SparseFields.from_dict(new_field_dict) + + @t.overload + def unset_to_none(self, value: oa_types.Unset) -> None: + """Return None if value is Unset, else the value.""" + + @t.overload + def unset_to_none(self, value: T) -> T: + """Return None if value is Unset, else the value.""" + + def unset_to_none(self, value: t.Any) -> t.Any: + """Return None if value is Unset, else the value.""" + if isinstance(value, oa_types.Unset): + return None + return value + + def _raise_on_error(self, response: oa_types.Response): + def unexpected_error(): + return errors.PolarionApiUnexpectedException( + response.status_code, response.content + ) + + if response.status_code not in range(400, 600): + return + + if ( + isinstance(response.parsed, api_models.Errors) + and response.parsed.errors + ): + raise errors.PolarionApiException( + *[ + ( + e.status, + e.detail, + ( + e.source.pointer + if not ( + isinstance(e.source, oa_types.Unset) + or e.source is None + ) + else "No error pointer" + ), + ) + for e in response.parsed.errors + ] + ) + raise unexpected_error() + + def _retry_on_error(self, call: t.Callable, *args: t.Any, **kwargs: t.Any): + try: + return call(*args, **kwargs) + except Exception as e: + logger.warning( + "Will retry after failing on first attempt, " + "due to the following error %s", + e, + ) + time.sleep(random.uniform(_min_sleep, _max_sleep)) + return call(*args, **kwargs) + + +class ItemsClient(BaseClient, t.Generic[T], abc.ABC): + @abc.abstractmethod + def _get_multi( + self, *args, page_size: int, page_number: int, **kwargs + ) -> tuple[list[T], bool]: ... + + def get_multi( + self, *args, page_size: int, page_number: int, **kwargs + ) -> tuple[list[T], bool]: + return self._retry_on_error( + self._get_multi, + *args, + page_size=page_size, + page_number=page_number, + **kwargs, + ) + + @abc.abstractmethod + def _get(self, *args, **kwargs) -> T: ... + + def get(self, *args, **kwargs) -> tuple[list[T], bool]: + return self._retry_on_error(self._get, *args, **kwargs) + + def get_all(self, *args, **kwargs) -> list[T]: + page = 1 + items, next_page = self.get_multi( + *args, page_size=self._client.page_size, page_number=page, **kwargs + ) + while next_page: + page += 1 + _items, next_page = self.get_multi( + *args, + page_size=self._client.page_size, + page_number=page, + **kwargs, + ) + items += _items + return items + + @abc.abstractmethod + def _create(self, items: list[T]): ... + + def _split_into_batches( + self, items: list[T] + ) -> t.Generator[list[T], None, None]: + for i in range(0, len(items), self._client.batch_size): + yield items[i : i + self._client.batch_size] + + def create(self, items: T | list[T]): + if not isinstance(items, list): + items = [items] + + for batch in self._split_into_batches(items): + self._retry_on_error(self._create, batch) + + @abc.abstractmethod + def _delete(self, items: list[T]): ... + + def delete(self, items: T | list[T]): + if not isinstance(items, list): + items = [items] + for batch in self._split_into_batches(items): + self._retry_on_error(self._delete, batch) + + +class UpdatableItemsClient(ItemsClient, t.Generic[T], abc.ABC): + @abc.abstractmethod + def _update(self, items: list[T]): ... + + def update(self, items: T | list[T]): + if not isinstance(items, list): + items = [items] + + self._update(items) + + +class StatusItemClient(UpdatableItemsClient, t.Generic[ST], abc.ABC): + def __init__( + self, + project_id: str, + client: "polarion_client.PolarionClient", + delete_status: str | None = None, + ): + super().__init__(project_id, client) + self.delete_status = delete_status + + def delete(self, items: ST | list[ST]): + if self.delete_status is None: + super().delete(items) + else: + if not isinstance(items, list): + items = [items] + for item in items: + item.status = self.delete_status + self.update(items) diff --git a/polarion_rest_api_client/clients/documents.py b/polarion_rest_api_client/clients/documents.py new file mode 100644 index 00000000..71f5232a --- /dev/null +++ b/polarion_rest_api_client/clients/documents.py @@ -0,0 +1,88 @@ +# Copyright DB InfraGO AG and contributors +# SPDX-License-Identifier: Apache-2.0 + +import urllib.parse + +from polarion_rest_api_client import data_models as dm +from polarion_rest_api_client.open_api_client import models as api_models +from polarion_rest_api_client.open_api_client import types as oa_types +from polarion_rest_api_client.open_api_client.api.documents import get_document + +from . import base_classes as bc + + +class Documents(bc.UpdatableItemsClient[dm.Document]): + def _get( + self, + space_id: str, + document_name: str, + fields: dict[str, str] | None = None, + include: str | None | oa_types.Unset = None, + revision: str | None | oa_types.Unset = None, + retry: bool = True, + ) -> dm.Document | None: + """Return the document with the given document_name and space_id.""" + if include is None: + include = oa_types.UNSET + + if revision is None: + revision = oa_types.UNSET + + if " " in space_id or " " in document_name: + space_id = urllib.parse.quote( + space_id, safe="/", encoding=None, errors=None + ) + document_name = urllib.parse.quote( + document_name, safe="/", encoding=None, errors=None + ) + if fields is None: + fields = self._client.default_fields.documents + + sparse_fields = self._build_sparse_fields(fields) + response = get_document.sync_detailed( + self._project_id, + space_id, + document_name, + client=self._client.client, + fields=sparse_fields, + include=include, + revision=revision, + ) + + self._raise_on_error(response) + + document_response = response.parsed + + if isinstance( + document_response, api_models.DocumentsSingleGetResponse + ) and (data := document_response.data): + if not getattr(data.meta, "errors", []): + assert (attributes := data.attributes) + assert isinstance(data.id, str) + home_page_content = self._handle_text_content( + attributes.home_page_content + ) + + return dm.Document( + id=data.id, + module_folder=self.unset_to_none(attributes.module_folder), + module_name=self.unset_to_none(attributes.module_name), + type=self.unset_to_none(attributes.type), + status=self.unset_to_none(attributes.status), + home_page_content=home_page_content, + ) + return None + + def _update(self, items: dm.Document | list[dm.Document]): + raise NotImplementedError + + def _get_multi( + self, *args, page_size, page_number, **kwargs + ) -> tuple[list[dm.Document], bool]: + raise NotImplementedError + + def _create(self, items: dm.Document | list[dm.Document]): + raise NotImplementedError + + def _delete(self, items: dm.Document | list[dm.Document]): + raise NotImplementedError diff --git a/polarion_rest_api_client/clients/projects.py b/polarion_rest_api_client/clients/projects.py new file mode 100644 index 00000000..b0f00a49 --- /dev/null +++ b/polarion_rest_api_client/clients/projects.py @@ -0,0 +1,32 @@ +# Copyright DB InfraGO AG and contributors +# SPDX-License-Identifier: Apache-2.0 + +from polarion_rest_api_client.clients import base_classes as bc +from polarion_rest_api_client.open_api_client.api.projects import get_project + +from . import documents, test_runs, work_items + + +class ProjectClient(bc.BaseClient): + def __init__( + self, + project_id: str, + client: "polarion_client.PolarionClient", + delete_status: str | None = None, + add_work_item_checksum: bool = False, + ): + super().__init__(project_id, client) + + self.work_items = work_items.WorkItems( + project_id, client, delete_status, add_work_item_checksum + ) + self.test_runs = test_runs.TestRuns(project_id, client) + self.documents = documents.Documents(project_id, client) + + def exists(self): + response = get_project.sync_detailed( + self._project_id, client=self._client.client + ) + if response.status_code == 200: + return True + return False diff --git a/polarion_rest_api_client/clients/test_records.py b/polarion_rest_api_client/clients/test_records.py new file mode 100644 index 00000000..ae0eb641 --- /dev/null +++ b/polarion_rest_api_client/clients/test_records.py @@ -0,0 +1,209 @@ +# Copyright DB InfraGO AG and contributors +# SPDX-License-Identifier: Apache-2.0 + +import itertools +import typing as t + +from polarion_rest_api_client import data_models as dm +from polarion_rest_api_client.open_api_client import models as api_models +from polarion_rest_api_client.open_api_client import types as oa_types +from polarion_rest_api_client.open_api_client.api.test_records import ( + get_test_records, + patch_test_record, + post_test_records, +) + +from . import base_classes as bc +from .base_classes import T + + +class TestRecords(bc.UpdatableItemsClient[dm.TestRecord]): + def _get(self, *args, **kwargs) -> T: + raise NotImplementedError + + def _update(self, items: list[dm.TestRecord]): + """Create the given list of test records.""" + for item in items: + self._update_single(item) + + def _update_single(self, test_record: dm.TestRecord): + response = patch_test_record.sync_detailed( + self._project_id, + test_record.test_run_id, + test_record.work_item_project_id, + test_record.work_item_id, + str(test_record.iteration), + client=self._client.client, + # pylint: disable=line-too-long + body=api_models.TestrecordsSinglePatchRequest( + data=api_models.TestrecordsSinglePatchRequestData( + type=api_models.TestrecordsSinglePatchRequestDataType.TESTRECORDS, + id=f"{self._project_id}/{test_record.test_run_id}/{test_record.work_item_project_id}/{test_record.work_item_id}/{test_record.iteration}", + attributes=self._fill_test_record_attributes( + api_models.TestrecordsSinglePatchRequestDataAttributes, + test_record, + ), + ) + ), + # pylint: enable=line-too-long + ) + self._raise_on_error(response) + + def _get_multi( + self, + test_run_id: str, + fields: dict[str, str] | None = None, + page_size: int = 100, + page_number: int = 1, + retry: bool = True, + ) -> tuple[list[dm.TestRecord], bool]: + """Return the test records on a defined page matching the given query. + + In addition, a flag whether a next page is available is + returned. Define a fields dictionary as described in the + Polarion API documentation to get certain fields. + """ + if fields is None: + fields = self._client.default_fields.testrecords + + sparse_fields = self._build_sparse_fields(fields) + response = get_test_records.sync_detailed( + self._project_id, + test_run_id, + client=self._client.client, + fields=sparse_fields, + pagenumber=page_number, + pagesize=page_size, + ) + + self._raise_on_error(response) + + parsed_response = response.parsed + assert isinstance( + parsed_response, api_models.TestrecordsListGetResponse + ) + + test_records = [] + + for data in parsed_response.data or []: + assert isinstance(data.id, str) + assert isinstance( + data.attributes, + api_models.TestrecordsListGetResponseDataItemAttributes, + ) + _, _, project_id, work_item, iteration = data.id.split("/") + test_records.append( + dm.TestRecord( + test_run_id, + project_id, + work_item, + self.unset_to_none(data.attributes.test_case_revision), + int(iteration), + ( + data.attributes.duration + if not isinstance( + data.attributes.duration, oa_types.Unset + ) + else -1 + ), + self.unset_to_none(data.attributes.result), + self._handle_text_content(data.attributes.comment), + self.unset_to_none(data.attributes.executed), + data.additional_properties or {}, + ) + ) + next_page = isinstance( + parsed_response.links, + api_models.TestrecordsListGetResponseLinks, + ) and bool(parsed_response.links.next_) + + return test_records, next_page + + def _split_into_batches( + self, items: list[dm.TestRecord] + ) -> t.Generator[list[dm.TestRecord], None, None]: + for _, group in itertools.groupby(items, lambda x: x.test_run_id): + yield from super()._split_into_batches(list(group)) + + def _create( + self, + test_records: list[dm.TestRecord], + ): + """Create the given list of test records.""" + response = post_test_records.sync_detailed( + self._project_id, + test_records[0].test_run_id, + client=self._client.client, + # pylint: disable=line-too-long + body=api_models.TestrecordsListPostRequest( + [ + api_models.TestrecordsListPostRequestDataItem( + type=api_models.TestrecordsListPostRequestDataItemType.TESTRECORDS, + attributes=self._fill_test_record_attributes( + api_models.TestrecordsListPostRequestDataItemAttributes, + test_record, + ), + relationships=api_models.TestrecordsListPostRequestDataItemRelationships( + test_case=api_models.TestrecordsListPostRequestDataItemRelationshipsTestCase( + data=api_models.TestrecordsListPostRequestDataItemRelationshipsTestCaseData( + type=api_models.TestrecordsListPostRequestDataItemRelationshipsTestCaseDataType.WORKITEMS, + id=f"{test_record.work_item_project_id}/{test_record.work_item_id}", + ) + ) + ), + ) + for test_record in test_records + ] + ), + # pylint: enable=line-too-long + ) + + self._raise_on_error(response) + + assert ( + isinstance(response.parsed, api_models.TestrecordsListPostResponse) + and response.parsed.data + ) + counter = 0 + for response_item in response.parsed.data: + if response_item.id: + test_records[counter].iteration = int( + response_item.id.split("/")[-1] + ) + counter += 1 + + def _delete(self, items: dm.TestRecord | list[dm.TestRecord]): + raise NotImplementedError + + def _fill_test_record_attributes( + self, + attributes_type: type[ + api_models.TestrecordsListPostRequestDataItemAttributes + | api_models.TestrecordsSinglePatchRequestDataAttributes + ], + test_record: dm.TestRecord, + ): + type_prefix = attributes_type.__name__ + attributes = attributes_type() + if test_record.result: + attributes.result = test_record.result + if test_record.comment: + attributes.comment = getattr(api_models, f"{type_prefix}Comment")() + assert attributes.comment + if test_record.comment.type: + attributes.comment.type = getattr( + api_models, f"{type_prefix}CommentType" + )(test_record.comment.type) + if test_record.comment.value: + attributes.comment.value = test_record.comment.value + if test_record.duration != -1: + attributes.duration = test_record.duration + if test_record.work_item_revision: + attributes.test_case_revision = test_record.work_item_revision + if test_record.executed: + attributes.executed = test_record.executed + if test_record.additional_attributes: + attributes.additional_properties = ( + test_record.additional_attributes + ) + return attributes diff --git a/polarion_rest_api_client/clients/test_runs.py b/polarion_rest_api_client/clients/test_runs.py new file mode 100644 index 00000000..2ce729bf --- /dev/null +++ b/polarion_rest_api_client/clients/test_runs.py @@ -0,0 +1,204 @@ +# Copyright DB InfraGO AG and contributors +# SPDX-License-Identifier: Apache-2.0 + +from polarion_rest_api_client import data_models as dm +from polarion_rest_api_client.open_api_client import models as api_models +from polarion_rest_api_client.open_api_client.api.test_runs import ( + get_test_runs, + patch_test_run, + post_test_runs, +) + +from . import base_classes as bc +from . import test_records +from .base_classes import T + + +class TestRuns(bc.UpdatableItemsClient[dm.TestRun]): + def _get(self, *args, **kwargs) -> T: + raise NotImplementedError + + def __init__( + self, project_id: str, client: "polarion_client.PolarionClient" + ): + super().__init__(project_id, client) + self.records = test_records.TestRecords(project_id, client) + + def _update(self, items: list[dm.TestRun]): + for item in items: + self._retry_on_error(self._update_single, item) + + def _update_single(self, test_run: dm.TestRun): + """Create the given list of test runs.""" + assert test_run.id + response = patch_test_run.sync_detailed( + self._project_id, + test_run.id, + client=self._client.client, + body=api_models.TestrunsSinglePatchRequest( + data=api_models.TestrunsSinglePatchRequestData( + type=api_models.TestrunsSinglePatchRequestDataType.TESTRUNS, # pylint: disable=line-too-long + id=f"{self._project_id}/{test_run.id}", + attributes=self._fill_test_run_attributes( + api_models.TestrunsSinglePatchRequestDataAttributes, + test_run, + ), + ) + ), + ) + + self._raise_on_error(response) + + def _get_multi( + self, + query: str = "", + fields: dict[str, str] | None = None, + page_size: int = 100, + page_number: int = 1, + retry: bool = True, + ) -> tuple[list[dm.TestRun], bool]: + """Return the test runs on a defined page matching the given query. + + In addition, a flag whether a next page is available is + returned. Define a fields dictionary as described in the + Polarion API documentation to get certain fields. + """ + if fields is None: + fields = self._client.default_fields.testruns + + sparse_fields = self._build_sparse_fields(fields) + response = get_test_runs.sync_detailed( + self._project_id, + client=self._client.client, + query=query, + fields=sparse_fields, + pagenumber=page_number, + pagesize=page_size, + ) + + self._raise_on_error(response) + + parsed_response = response.parsed + assert isinstance(parsed_response, api_models.TestrunsListGetResponse) + + test_runs = [] + for data in parsed_response.data or []: + assert isinstance(data.id, str) + assert isinstance( + data.attributes, + api_models.TestrunsListGetResponseDataItemAttributes, + ) + test_runs.append( + dm.TestRun( + data.id.split("/")[-1], + self.unset_to_none(data.attributes.type), + self.unset_to_none(data.attributes.status), + self.unset_to_none(data.attributes.title), + self._handle_text_content( + data.attributes.home_page_content + ), + self.unset_to_none(data.attributes.finished_on), + self.unset_to_none(data.attributes.group_id), + self.unset_to_none(data.attributes.id_prefix), + self.unset_to_none(data.attributes.is_template), + self.unset_to_none(data.attributes.keep_in_history), + self.unset_to_none(data.attributes.query), + self.unset_to_none(data.attributes.keep_in_history), + ( + dm.SelectTestCasesBy( + str(data.attributes.select_test_cases_by) + ) + if data.attributes.select_test_cases_by + else None + ), + data.attributes.additional_properties or {}, + ) + ) + next_page = isinstance( + parsed_response.links, + api_models.TestrunsListGetResponseLinks, + ) and bool(parsed_response.links.next_) + + return test_runs, next_page + + def _create(self, test_runs: list[dm.TestRun], retry: bool = True): + """Create the given list of test runs.""" + polarion_test_runs = [ + api_models.TestrunsListPostRequestDataItem( + type=api_models.TestrunsListPostRequestDataItemType.TESTRUNS, + attributes=self._fill_test_run_attributes( + api_models.TestrunsListPostRequestDataItemAttributes, + test_run, + ), + ) + for test_run in test_runs + ] + + response = post_test_runs.sync_detailed( + self._project_id, + client=self._client.client, + body=api_models.TestrunsListPostRequest(polarion_test_runs), + ) + + self._raise_on_error(response) + + def _delete(self, items: dm.TestRun | list[dm.TestRun]): + raise NotImplementedError + + def _fill_test_run_attributes( + self, + attributes_type: type[ + api_models.TestrunsListPostRequestDataItemAttributes + | api_models.TestrunsSinglePatchRequestDataAttributes + ], + test_run: dm.TestRun, + ): + type_prefix = attributes_type.__name__ + attributes = attributes_type() + if test_run.type is not None: + attributes.type = test_run.type + if test_run.id and hasattr(attributes, "id"): + attributes.id = test_run.id + if test_run.status is not None: + attributes.status = test_run.status + if test_run.title is not None: + attributes.title = test_run.title + if test_run.finished_on is not None: + attributes.finished_on = test_run.finished_on + if test_run.group_id is not None: + attributes.group_id = test_run.group_id + if test_run.id_prefix is not None: + attributes.id_prefix = test_run.id_prefix + if test_run.is_template is not None and hasattr( + attributes, "is_template" + ): + attributes.is_template = test_run.is_template + if test_run.keep_in_history is not None: + attributes.keep_in_history = test_run.keep_in_history + if test_run.query is not None: + attributes.query = test_run.query + if test_run.use_report_from_template is not None: + attributes.use_report_from_template = ( + test_run.use_report_from_template + ) + if test_run.additional_attributes: + attributes.additional_properties = test_run.additional_attributes + if test_run.select_test_cases_by: + attributes.select_test_cases_by = getattr( + api_models, f"{type_prefix}SelectTestCasesBy" + )(test_run.select_test_cases_by.value) + if test_run.home_page_content: + attributes.home_page_content = getattr( + api_models, f"{type_prefix}HomePageContent" + )() + assert attributes.home_page_content + if test_run.home_page_content.type: + attributes.home_page_content.type = getattr( + api_models, f"{type_prefix}HomePageContentType" + )(test_run.home_page_content.type) + if test_run.home_page_content.value: + attributes.home_page_content.value = ( + test_run.home_page_content.value + ) + + return attributes diff --git a/polarion_rest_api_client/clients/work_item_attachments.py b/polarion_rest_api_client/clients/work_item_attachments.py new file mode 100644 index 00000000..cd7df80b --- /dev/null +++ b/polarion_rest_api_client/clients/work_item_attachments.py @@ -0,0 +1,215 @@ +# Copyright DB InfraGO AG and contributors +# SPDX-License-Identifier: Apache-2.0 + +import io + +from polarion_rest_api_client import data_models as dm +from polarion_rest_api_client.open_api_client import models as api_models +from polarion_rest_api_client.open_api_client import types as oa_types +from polarion_rest_api_client.open_api_client.api.work_item_attachments import ( # pylint: disable=line-too-long + delete_work_item_attachment, + get_work_item_attachments, + patch_work_item_attachment, + post_work_item_attachments, +) + +from . import base_classes as bc + + +class WorkItemAttachments(bc.UpdatableItemsClient[dm.WorkItemAttachment]): + def _get(self, *args, **kwargs) -> dm.WorkItemAttachment: + raise NotImplementedError + + def _update( + self, items: dm.WorkItemAttachment | list[dm.WorkItemAttachment] + ): + for work_item_attachment in items: + self._retry_on_error(self._update_single, work_item_attachment) + + def _update_single(self, work_item_attachment: dm.WorkItemAttachment): + """Update the given work item attachment in Polarion.""" + attributes = ( + api_models.WorkitemAttachmentsSinglePatchRequestDataAttributes() + ) + if work_item_attachment.title: + attributes.title = work_item_attachment.title + + multipart = api_models.PatchWorkItemAttachmentsRequestBody( + resource=api_models.WorkitemAttachmentsSinglePatchRequest( + data=api_models.WorkitemAttachmentsSinglePatchRequestData( + type=api_models.WorkitemAttachmentsSinglePatchRequestDataType.WORKITEM_ATTACHMENTS, # pylint: disable=line-too-long + id=f"{self._project_id}/{work_item_attachment.work_item_id}/{work_item_attachment.id}", # pylint: disable=line-too-long + attributes=attributes, + ) + ) + ) + + if work_item_attachment.content_bytes: + multipart.content = oa_types.File( + io.BytesIO(work_item_attachment.content_bytes), + work_item_attachment.file_name, + work_item_attachment.mime_type, + ) + + response = patch_work_item_attachment.sync_detailed( + self._project_id, + work_item_attachment.work_item_id, + work_item_attachment.id, + client=self._client.client, + body=multipart, + ) + self._raise_on_error(response) + + def _get_multi( + self, + work_item_id: str, + fields: dict[str, str] | None = None, + page_size: int = 100, + page_number: int = 1, + retry: bool = True, + ) -> tuple[list[dm.WorkItemAttachment], bool]: + """Return the attachments for a given work item on a defined page. + + In addition, a flag whether a next page is available is + returned. Define a fields dictionary as described in the + Polarion API documentation to get certain fields. + """ + if fields is None: + fields = self._client.default_fields.workitem_attachments + + sparse_fields = self._build_sparse_fields(fields) + response = get_work_item_attachments.sync_detailed( + self._project_id, + work_item_id=work_item_id, + client=self._client.client, + fields=sparse_fields, + pagesize=page_size, + pagenumber=page_number, + ) + + self._raise_on_error(response) + + parsed_response = response.parsed + + work_item_attachments: list[dm.WorkItemAttachment] = [] + + next_page = False + + if ( + isinstance( + parsed_response, api_models.WorkitemAttachmentsListGetResponse + ) + and parsed_response.data + ): + for attachment in parsed_response.data: + assert attachment.attributes + assert isinstance(attachment.attributes.id, str) + + work_item_attachments.append( + dm.WorkItemAttachment( + work_item_id, + attachment.attributes.id, + self.unset_to_none(attachment.attributes.title), + file_name=self.unset_to_none( + attachment.attributes.file_name + ), + ) + ) + + next_page = isinstance( + parsed_response.links, + api_models.WorkitemAttachmentsListGetResponseLinks, + ) and bool(parsed_response.links.next_) + + return work_item_attachments, next_page + + def _create(self, work_item_attachments: list[dm.WorkItemAttachment]): + """Create the given work item attachment in Polarion.""" + response = self._retry_on_error( + self._perform_creation, work_item_attachments + ) + assert ( + isinstance( + response.parsed, api_models.WorkitemAttachmentsListPostResponse + ) + and response.parsed.data + ) + counter = 0 + for work_item_attachment_res in response.parsed.data: + assert work_item_attachment_res.id + work_item_attachments[counter].id = ( + work_item_attachment_res.id.split("/")[-1] + ) + counter += 1 + + def _perform_creation(self, work_item_attachments): + attachment_attributes = [] + attachment_files = [] + assert len(work_item_attachments), "No attachments were provided." + assert all( + [wia.work_item_id == work_item_attachments[0].work_item_id] + for wia in work_item_attachments + ), "All attachments must belong to the same WorkItem." + for work_item_attachment in work_item_attachments: + assert ( + work_item_attachment.file_name + ), "You have to define a FileName." + assert ( + work_item_attachment.content_bytes + ), "You have to provide content bytes." + assert ( + work_item_attachment.mime_type + ), "You have to provide a mime_type." + + attributes = api_models.WorkitemAttachmentsListPostRequestDataItemAttributes( + # pylint: disable=line-too-long + file_name=work_item_attachment.file_name + ) + if work_item_attachment.title: + attributes.title = work_item_attachment.title + + attachment_attributes.append( + api_models.WorkitemAttachmentsListPostRequestDataItem( + type=api_models.WorkitemAttachmentsListPostRequestDataItemType.WORKITEM_ATTACHMENTS, + # pylint: disable=line-too-long + attributes=attributes, + ) + ) + + attachment_files.append( + oa_types.File( + io.BytesIO(work_item_attachment.content_bytes), + work_item_attachment.file_name, + work_item_attachment.mime_type, + ) + ) + multipart = api_models.PostWorkItemAttachmentsRequestBody( + resource=api_models.WorkitemAttachmentsListPostRequest( + attachment_attributes + ), + files=attachment_files, + ) + response = post_work_item_attachments.sync_detailed( + self._project_id, + work_item_attachments[0].work_item_id, + client=self._client.client, + body=multipart, + ) + self._raise_on_error(response) + return response + + def _delete( + self, items: dm.WorkItemAttachment | list[dm.WorkItemAttachment] + ): + for item in items: + self._retry_on_error(self._single_delete, item) + + def _single_delete(self, work_item_attachment: dm.WorkItemAttachment): + """Delete the given work item attachment.""" + response = delete_work_item_attachment.sync_detailed( + self._project_id, + work_item_attachment.work_item_id, + work_item_attachment.id, + client=self._client.client, + ) + self._raise_on_error(response) diff --git a/polarion_rest_api_client/clients/work_item_links.py b/polarion_rest_api_client/clients/work_item_links.py new file mode 100644 index 00000000..c156e6b4 --- /dev/null +++ b/polarion_rest_api_client/clients/work_item_links.py @@ -0,0 +1,160 @@ +# Copyright DB InfraGO AG and contributors +# SPDX-License-Identifier: Apache-2.0 + +import itertools +import typing as t + +from polarion_rest_api_client import data_models as dm +from polarion_rest_api_client.open_api_client import models as api_models +from polarion_rest_api_client.open_api_client import types as oa_types +from polarion_rest_api_client.open_api_client.api.linked_work_items import ( + delete_linked_work_items, + get_linked_work_items, + post_linked_work_items, +) + +from . import base_classes as bc +from .base_classes import T + + +class WorkItemLinks(bc.ItemsClient[dm.WorkItemLink]): + def _get(self, *args, **kwargs) -> tuple[list[T], bool]: + raise NotImplementedError + + def _get_multi( + self, + work_item_id: str, + fields: dict[str, str] | None = None, + include: str | None | oa_types.Unset = None, + page_size: int = 100, + page_number: int = 1, + retry: bool = True, + ) -> tuple[list[dm.WorkItemLink], bool]: + """Get the work item links for the given work item on a page. + + In addition, a flag whether a next page is available is + returned. Define a fields dictionary as described in the + Polarion API documentation to get certain fields. + """ + if fields is None: + fields = self._client.default_fields.linkedworkitems + + if include is None: + include = oa_types.UNSET + + sparse_fields = self._build_sparse_fields(fields) + response = get_linked_work_items.sync_detailed( + self._project_id, + work_item_id, + client=self._client.client, + fields=sparse_fields, + include=include, + pagesize=page_size, + pagenumber=page_number, + ) + + self._raise_on_error(response) + + linked_work_item_response = response.parsed + + work_item_links: list[dm.WorkItemLink] = [] + next_page = False + if ( + isinstance( + linked_work_item_response, + api_models.LinkedworkitemsListGetResponse, + ) + and linked_work_item_response.data + ): + for link in linked_work_item_response.data: + assert isinstance(link.id, str) + assert isinstance( + link.attributes, + api_models.LinkedworkitemsListGetResponseDataItemAttributes, # pylint: disable=line-too-long + ) + + work_item_links.append( + self._parse_work_item_link( + link.id, link.attributes.suspect, work_item_id + ) + ) + + next_page = isinstance( + linked_work_item_response.links, + api_models.LinkedworkitemsListGetResponseLinks, + ) and bool(linked_work_item_response.links.next_) + + return work_item_links, next_page + + def _parse_work_item_link(self, link_id, suspect, work_item_id): + info = link_id.split("/") + assert len(info) == 5 + role_id, target_project_id, linked_work_item_id = info[2:] + if isinstance(suspect, oa_types.Unset): + suspect = False + work_item_link = dm.WorkItemLink( + work_item_id, + linked_work_item_id, + role_id, + suspect, + target_project_id, + ) + return work_item_link + + def _split_into_batches( + self, items: list[dm.WorkItemLink] + ) -> t.Generator[list[dm.WorkItemLink], None, None]: + for _, group in itertools.groupby( + items, lambda x: x.primary_work_item_id + ): + yield from super()._split_into_batches(list(group)) + + def _create(self, work_item_links: list[dm.WorkItemLink]): + response = post_linked_work_items.sync_detailed( + self._project_id, + work_item_links[0].primary_work_item_id, + client=self._client.client, + # pylint: disable=line-too-long + body=api_models.LinkedworkitemsListPostRequest( + data=[ + api_models.LinkedworkitemsListPostRequestDataItem( + type=api_models.LinkedworkitemsListPostRequestDataItemType.LINKEDWORKITEMS, + attributes=api_models.LinkedworkitemsListPostRequestDataItemAttributes( + role=work_item_link.role, + suspect=work_item_link.suspect or False, + ), + relationships=api_models.LinkedworkitemsListPostRequestDataItemRelationships( + work_item=api_models.LinkedworkitemsListPostRequestDataItemRelationshipsWorkItem( + data=api_models.LinkedworkitemsListPostRequestDataItemRelationshipsWorkItemData( + type=api_models.LinkedworkitemsListPostRequestDataItemRelationshipsWorkItemDataType.WORKITEMS, + id=f"{work_item_link.secondary_work_item_project or self._project_id}/{work_item_link.secondary_work_item_id}", + ) + ) + ), + ) + for work_item_link in work_item_links + ] + ), + # pylint: enable=line-too-long + ) + + self._raise_on_error(response) + + def _delete(self, work_item_links: list[dm.WorkItemLink]): + response = delete_linked_work_items.sync_detailed( + self._project_id, + work_item_links[0].primary_work_item_id, + client=self._client.client, + # pylint: disable=line-too-long + body=api_models.LinkedworkitemsListDeleteRequest( + data=[ + api_models.LinkedworkitemsListDeleteRequestDataItem( + type=api_models.LinkedworkitemsListDeleteRequestDataItemType.LINKEDWORKITEMS, + id=f"{self._project_id}/{work_item_link.primary_work_item_id}/{work_item_link.role}/{work_item_link.secondary_work_item_project or self._project_id}/{work_item_link.secondary_work_item_id}", + ) + for work_item_link in work_item_links + ] + ), + # pylint: enable=line-too-long + ) + self._raise_on_error(response) diff --git a/polarion_rest_api_client/clients/work_items.py b/polarion_rest_api_client/clients/work_items.py new file mode 100644 index 00000000..937d5ecb --- /dev/null +++ b/polarion_rest_api_client/clients/work_items.py @@ -0,0 +1,408 @@ +# Copyright DB InfraGO AG and contributors +# SPDX-License-Identifier: Apache-2.0 + +import json +import logging +import typing as t + +from polarion_rest_api_client import data_models as dm +from polarion_rest_api_client import errors +from polarion_rest_api_client.open_api_client import models as api_models +from polarion_rest_api_client.open_api_client import types as oa_types +from polarion_rest_api_client.open_api_client.api.work_items import ( + delete_work_items, + get_work_item, + get_work_items, + patch_work_item, + post_work_items, +) + +from . import base_classes as bc +from . import work_item_attachments, work_item_links + +WorkItemType = t.TypeVar("WorkItemType", bound=dm.WorkItem) +logger = logging.getLogger(__name__) + + +def _get_json_content_size(data: dict): + return len(json.dumps(data).encode("utf-8")) + + +min_wi_request_size = _get_json_content_size( + api_models.WorkitemsListPostRequest(data=[]).to_dict() +) + + +class WorkItems(bc.StatusItemClient, t.Generic[WorkItemType]): + def __init__( + self, + project_id: str, + client: "polarion_client.PolarionClient", + delete_status: str | None = None, + add_work_item_checksum: bool = False, + ): + super().__init__(project_id, client, delete_status) + self.attachments = work_item_attachments.WorkItemAttachments( + project_id, client + ) + self.links = work_item_links.WorkItemLinks(project_id, client) + self.add_work_item_checksum = add_work_item_checksum + + def _update_single(self, work_item: WorkItemType): + assert work_item.id is not None + if work_item.type: + logger.warning( + "Attempting to change the type of Work Item %s to %s.", + work_item.id, + work_item.type, + ) + + response = patch_work_item.sync_detailed( + self._project_id, + work_item.id, + client=self._client.client, + change_type_to=work_item.type or oa_types.UNSET, + body=self._build_work_item_patch_request(work_item), + ) + self._raise_on_error(response) + + def _update(self, items: list[WorkItemType]): + for work_item in items: + self._retry_on_error(self._update_single, work_item) + + def _get_multi( + self, + query: str = "", + fields: dict[str, str] | None = None, + work_item_cls: type[WorkItemType] = dm.WorkItem, + page_size: int = 100, + page_number: int = 1, + ) -> tuple[list[WorkItemType], bool]: + """Return the work items on a defined page matching the given query. + + In addition, a flag whether a next page is available is + returned. Define a fields dictionary as described in the + Polarion API documentation to get certain fields. + """ + if fields is None: + fields = self._client.default_fields.workitems + + sparse_fields = self._build_sparse_fields(fields) + response = get_work_items.sync_detailed( + self._project_id, + client=self._client.client, + fields=sparse_fields, + query=query, + pagesize=page_size, + pagenumber=page_number, + ) + + self._raise_on_error(response) + + work_items_response = response.parsed + + work_items: list[WorkItemType] = [] + + next_page = False + if ( + isinstance( + work_items_response, api_models.WorkitemsListGetResponse + ) + and work_items_response.data + ): + work_items = [ + self._generate_work_item(work_item, work_item_cls) + for work_item in work_items_response.data + if not getattr(work_item.meta, "errors", []) + ] + + next_page = isinstance( + work_items_response.links, + api_models.WorkitemsListGetResponseLinks, + ) and bool(work_items_response.links.next_) + + return work_items, next_page + + def _get( + self, + work_item_id: str, + work_item_cls: type[WorkItemType] = dm.WorkItem, + ) -> WorkItemType | None: + """Return one specific work item with all fields. + + This also includes all linked work items and attachments. If + there are to many of these to get them in one request, the + truncated flags for linked_work_items and attachments will be + set to True. + """ + response = get_work_item.sync_detailed( + self._project_id, + work_item_id, + client=self._client.client, + fields=self._build_sparse_fields( + { + "workitems": "@all", + "workitem_attachments": "@all", + "linkedworkitems": "@all", + } + ), + ) + self._raise_on_error(response) + + if isinstance( + response.parsed, api_models.WorkitemsSingleGetResponse + ) and isinstance( + response.parsed.data, api_models.WorkitemsSingleGetResponseData + ): + return self._generate_work_item( + response.parsed.data, work_item_cls + ) + + return None + + def _create(self, items: list[WorkItemType]): + raise NotImplementedError("We have a custom create instead.") + + def create(self, items: WorkItemType | list[WorkItemType]): + current_batch = api_models.WorkitemsListPostRequest(data=[]) + content_size = min_wi_request_size + batch_start_index = 0 + batch_end_index = 0 + + for work_item in items: + work_item_data = self._build_work_item_post_request(work_item) + + ( + proj_content_size, + too_big, + ) = self._calculate_post_work_item_request_sizes( + work_item_data, content_size + ) + + if too_big: + raise errors.PolarionWorkItemException( + "A WorkItem is too large to create.", work_item + ) + + assert isinstance(current_batch.data, list) + if ( + proj_content_size >= self._client.max_content_size + or len(current_batch.data) >= self._client.batch_size + ): + self._retry_on_error( + self._post_work_item_batch, + current_batch, + items[batch_start_index:batch_end_index], + ) + + current_batch = api_models.WorkitemsListPostRequest( + data=[work_item_data] + ) + content_size = _get_json_content_size(current_batch.to_dict()) + batch_start_index = batch_end_index + else: + assert isinstance(current_batch.data, list) + current_batch.data.append(work_item_data) + content_size = proj_content_size + + batch_end_index += 1 + + if current_batch.data: + self._retry_on_error( + self._post_work_item_batch, + current_batch, + items[batch_start_index:], + ) + + def _delete(self, items: WorkItemType | list[WorkItemType]): + work_item_ids = [work_item.id for work_item in items] + response = delete_work_items.sync_detailed( + self._project_id, + client=self._client.client, + body=api_models.WorkitemsListDeleteRequest( + data=[ + api_models.WorkitemsListDeleteRequestDataItem( + type=api_models.WorkitemsListDeleteRequestDataItemType.WORKITEMS, # pylint: disable=line-too-long + id=f"{self._project_id}/{work_item_id}", + ) + for work_item_id in work_item_ids + ] + ), + ) + self._raise_on_error(response) + + def _build_work_item_post_request( + self, work_item: WorkItemType + ) -> api_models.WorkitemsListPostRequestDataItem: + assert work_item.type is not None + assert work_item.title is not None + assert work_item.description is not None + assert work_item.status is not None + + attrs = api_models.WorkitemsListPostRequestDataItemAttributes( + type=work_item.type, + description=api_models.WorkitemsListPostRequestDataItemAttributesDescription( # pylint: disable=line-too-long + type=api_models.WorkitemsListPostRequestDataItemAttributesDescriptionType( # pylint: disable=line-too-long + work_item.description_type + ), + value=work_item.description, + ), + status=work_item.status, + title=work_item.title, + ) + + attrs.additional_properties.update(work_item.additional_attributes) + + if self.add_work_item_checksum: + attrs.additional_properties["checksum"] = ( + work_item.calculate_checksum() + ) + + return api_models.WorkitemsListPostRequestDataItem( + type=api_models.WorkitemsListPostRequestDataItemType.WORKITEMS, + attributes=attrs, + ) + + def _build_work_item_patch_request( + self, work_item: WorkItemType + ) -> api_models.WorkitemsSinglePatchRequest: + attrs = api_models.WorkitemsSinglePatchRequestDataAttributes() + + if work_item.title is not None: + attrs.title = work_item.title + + if work_item.description is not None: + attrs.description = api_models.WorkitemsSinglePatchRequestDataAttributesDescription( # pylint: disable=line-too-long + type=api_models.WorkitemsSinglePatchRequestDataAttributesDescriptionType( # pylint: disable=line-too-long + work_item.description_type + ), + value=work_item.description, + ) + + if work_item.status is not None: + attrs.status = work_item.status + + attrs.additional_properties.update(work_item.additional_attributes) + + if self.add_work_item_checksum: + attrs.additional_properties["checksum"] = ( + work_item.get_current_checksum() + ) + + return api_models.WorkitemsSinglePatchRequest( + data=api_models.WorkitemsSinglePatchRequestData( + type=api_models.WorkitemsSinglePatchRequestDataType.WORKITEMS, + id=f"{self._project_id}/{work_item.id}", + attributes=attrs, + ) + ) + + def _post_work_item_batch( + self, + work_item_batch: api_models.WorkitemsListPostRequest, + work_item_objs: list[WorkItemType], + ): + response = post_work_items.sync_detailed( + self._project_id, client=self._client.client, body=work_item_batch + ) + + self._raise_on_error(response) + + assert ( + isinstance(response.parsed, api_models.WorkitemsListPostResponse) + and response.parsed.data + ) + counter = 0 + for work_item_res in response.parsed.data: + assert work_item_res.id + work_item_objs[counter].id = work_item_res.id.split("/")[-1] + counter += 1 + + def _calculate_post_work_item_request_sizes( + self, + work_item_data: api_models.WorkitemsListPostRequestDataItem, + current_content_size: int = min_wi_request_size, + ) -> t.Tuple[int, bool]: + work_item_size = _get_json_content_size(work_item_data.to_dict()) + + proj_content_size = current_content_size + work_item_size + if current_content_size != min_wi_request_size: + proj_content_size += len(b", ") + + return ( + proj_content_size, + (work_item_size + min_wi_request_size) + > self._client.max_content_size, + ) + + def _generate_work_item( + self, + work_item: ( + api_models.WorkitemsListGetResponseDataItem + | api_models.WorkitemsSingleGetResponseData + ), + work_item_cls: t.Type[WorkItemType] = dm.WorkItem, + ) -> WorkItemType: + assert work_item.attributes + assert isinstance(work_item.id, str) + work_item_id = work_item.id.split("/")[-1] + links = [] + attachments = [] + + # We set both truncated flags to True and will only set them to False, + # if the corresponding fields were requested and returned completely + links_truncated = True + attachments_truncated = True + if work_item.relationships: + if links := work_item.relationships.linked_work_items: + if not links.meta or links.meta.total_count is oa_types.UNSET: + links_truncated = False + + links = [ + self.links._parse_work_item_link( + link.id, + link.additional_properties.get("suspect", False), + work_item_id, + ) + for link in links.data or [] + ] + + if attachment_data := work_item.relationships.attachments: + if ( + not attachment_data.meta + or attachment_data.meta.total_count is oa_types.UNSET + ): + attachments_truncated = False + + attachments = [ + dm.WorkItemAttachment( + work_item_id, + attachment.id.split("/")[-1], + None, # title isn't provided + ) + for attachment in attachment_data.data or [] + if attachment.id + ] + + desctype = None + desc = None + if work_item.attributes.description: + desctype = self.unset_to_none( + work_item.attributes.description.type + ) + desc = self.unset_to_none(work_item.attributes.description.value) + + work_item_obj = work_item_cls( + work_item_id, + self.unset_to_none(work_item.attributes.title), + desctype, + desc, + self.unset_to_none(work_item.attributes.type), + self.unset_to_none(work_item.attributes.status), + work_item.attributes.additional_properties, + links, + attachments, + links_truncated, + attachments_truncated, + ) + return work_item_obj diff --git a/polarion_rest_api_client/data_models.py b/polarion_rest_api_client/data_models.py index 978919b9..37d57bd9 100644 --- a/polarion_rest_api_client/data_models.py +++ b/polarion_rest_api_client/data_models.py @@ -13,7 +13,7 @@ @dataclasses.dataclass -class BaseItem: +class StatusItem: """A parent data class for WorkItem and Document.""" id: str | None = None @@ -22,8 +22,8 @@ class BaseItem: _checksum: str | None = dataclasses.field(init=False, default=None) def __eq__(self, other: object) -> bool: - """Compare only BaseItem attributes.""" - if not isinstance(other, BaseItem): + """Compare only StatusItem attributes.""" + if not isinstance(other, StatusItem): return NotImplemented if self.get_current_checksum() is None: self.calculate_checksum() @@ -33,7 +33,7 @@ def __eq__(self, other: object) -> bool: return self.get_current_checksum() == other.get_current_checksum() def to_dict(self) -> dict[str, t.Any]: - """Return the content of the BaseItem as dictionary.""" + """Return the content of the StatusItem as dictionary.""" return { "id": self.id, "type": self.type, @@ -42,7 +42,7 @@ def to_dict(self) -> dict[str, t.Any]: } def calculate_checksum(self) -> str: - """Calculate and return a checksum for this BaseItem. + """Calculate and return a checksum for this StatusItem. In addition, the checksum will be written to self._checksum. """ @@ -61,7 +61,7 @@ def get_current_checksum(self) -> str | None: return self._checksum -class WorkItem(BaseItem): +class WorkItem(StatusItem): """A data class containing all relevant data of a Polarion WorkItem.""" title: str | None = None @@ -208,7 +208,7 @@ class WorkItemAttachment: file_name: str | None = None -class Document(BaseItem): +class Document(StatusItem): """A data class containing all relevant data of a Polarion Document.""" module_folder: str | None = None @@ -231,7 +231,7 @@ def __init__( @dataclasses.dataclass -class TestRun(BaseItem): +class TestRun(StatusItem): """A data class for all data of a test run.""" title: str | None = None @@ -253,6 +253,7 @@ class TestRun(BaseItem): class TestRecord: """A data class for test record data.""" + test_run_id: str work_item_project_id: str work_item_id: str work_item_revision: str | None = None diff --git a/polarion_rest_api_client/old_client.py b/polarion_rest_api_client/old_client.py new file mode 100644 index 00000000..ab9d498d --- /dev/null +++ b/polarion_rest_api_client/old_client.py @@ -0,0 +1,361 @@ +# Copyright DB InfraGO AG and contributors +# SPDX-License-Identifier: Apache-2.0 +"""The actual implementation of the API client using an OpenAPIClient.""" +from __future__ import annotations + +import typing as t + +from polarion_rest_api_client import client +from polarion_rest_api_client import data_models as dm +from polarion_rest_api_client.client import WorkItemType + + +class OpenAPIPolarionProjectClient(t.Generic[WorkItemType]): + """A Polarion Project Client using an auto generated OpenAPI-Client.""" + + delete_status: str = "deleted" + + @t.overload + def __init__( + self: "OpenAPIPolarionProjectClient[client.WorkItemType]", + project_id: str, + delete_polarion_work_items: bool, + polarion_api_endpoint: str, + polarion_access_token: str, + *, + custom_work_item: type[client.WorkItemType], + batch_size: int = ..., + page_size: int = ..., + add_work_item_checksum: bool = False, + max_content_size: int = ..., + httpx_args: t.Optional[dict[str, t.Any]] = ..., + ): ... + + @t.overload + def __init__( + self: "OpenAPIPolarionProjectClient[dm.WorkItem]", + project_id: str, + delete_polarion_work_items: bool, + polarion_api_endpoint: str, + polarion_access_token: str, + *, + batch_size: int = ..., + page_size: int = ..., + add_work_item_checksum: bool = False, + max_content_size: int = ..., + httpx_args: t.Optional[dict[str, t.Any]] = ..., + ): ... + + def __init__( + self, + project_id: str, + delete_polarion_work_items: bool, + polarion_api_endpoint: str, + polarion_access_token: str, + *, + custom_work_item=dm.WorkItem, + batch_size: int = 100, + page_size: int = 100, + add_work_item_checksum: bool = False, + max_content_size: int = 2 * 1024**2, + httpx_args: t.Optional[dict[str, t.Any]] = None, + ): + """Initialize the client for project and endpoint using a token. + + Parameters + ---------- + project_id : str + ID of the project to create a client for. + delete_polarion_work_items : bool + Flag indicating whether to delete work items or set a status. + polarion_api_endpoint : str + The URL of the Polarion API endpoint. + polarion_access_token : str + A personal access token to access the API. + custom_work_item : default dm.WorkItem + Custom WorkItem class with additional attributes. + batch_size : int, default 100 + Maximum amount of items created in one POST request. + page_size : int, default 100 + Default size of a page when getting items from the API. + add_work_item_checksum : bool, default False + Flag whether post WorkItem checksums. + max_content_size : int, default 2 * 1024**2 + Maximum content-length of the API (default: 2MB). + httpx_args: t.Optional[dict[str, t.Any]], default None + Additional parameters, which will be passed to the httpx client. + """ + polarion_client = client.PolarionClient( + polarion_api_endpoint, + polarion_access_token, + httpx_args, + batch_size, + page_size, + max_content_size, + ) + self.project_client = polarion_client.get_project_client( + project_id, + None if delete_polarion_work_items else "deleted", + add_work_item_checksum, + ) + self.project_id = project_id + self.custom_work_item = custom_work_item + + def project_exists(self) -> bool: + """Return True if self.project_id exists and False if not.""" + return self.project_client.exists() + + def delete_work_item_attachment( + self, work_item_attachment: dm.WorkItemAttachment + ): + """Delete the given work item attachment.""" + self.project_client.work_items.attachments.delete(work_item_attachment) + + def update_work_item_attachment( + self, work_item_attachment: dm.WorkItemAttachment + ): + """Update the given work item attachment in Polarion.""" + self.project_client.work_items.attachments.update(work_item_attachment) + + def create_work_item_attachments( + self, + work_item_attachments: list[dm.WorkItemAttachment], + ): + """Create the given work item attachment in Polarion.""" + self.project_client.work_items.attachments.create( + work_item_attachments + ) + + def get_work_item( + self, + work_item_id: str, + ) -> client.WorkItemType | None: + """Return one specific work item with all fields. + + This also includes all linked work items and attachments. If + there are to many of these to get them in one request, the + truncated flags for linked_work_items and attachments will be + set to True. + """ + return self.project_client.work_items.get( + work_item_id, self.custom_work_item + ) + + def get_document( + self, + space_id: str, + document_name: str, + fields: dict[str, str] | None = None, + include: str | None = None, + revision: str | None = None, + ) -> dm.Document | None: + """Return the document with the given document_name and space_id.""" + return self.project_client.documents.get( + space_id, document_name, fields, include, revision + ) + + def create_work_items(self, work_items: list[client.WorkItemType]): + """Create the given list of work items.""" + self.project_client.work_items.create(work_items) + + def update_work_item(self, work_item: client.WorkItemType): + """Update the given work item in Polarion. + + Only fields not set to None will be updated in Polarion. None + fields will stay untouched. + """ + self.project_client.work_items.update(work_item) + + def get_work_item_links( + self, + work_item_id: str, + fields: dict[str, str] | None = None, + include: str | None = None, + page_size: int = 100, + page_number: int = 1, + ) -> tuple[list[dm.WorkItemLink], bool]: + """Get the work item links for the given work item on a page. + + In addition, a flag whether a next page is available is + returned. Define a fields dictionary as described in the + Polarion API documentation to get certain fields. + """ + return self.project_client.work_items.links.get_multi( + work_item_id, + fields, + include, + page_size=page_size, + page_number=page_number, + ) + + def get_test_records( + self, + test_run_id: str, + fields: dict[str, str] | None = None, + page_size: int = 100, + page_number: int = 1, + ) -> tuple[list[dm.TestRecord], bool]: + """Return the test records on a defined page matching the given query. + + In addition, a flag whether a next page is available is + returned. Define a fields dictionary as described in the + Polarion API documentation to get certain fields. + """ + return self.project_client.test_runs.records.get_multi( + test_run_id, fields, page_size=page_size, page_number=page_number + ) + + def get_test_runs( + self, + query: str = "", + fields: dict[str, str] | None = None, + page_size: int = 100, + page_number: int = 1, + ) -> tuple[list[dm.TestRun], bool]: + """Return the test runs on a defined page matching the given query. + + In addition, a flag whether a next page is available is + returned. Define a fields dictionary as described in the + Polarion API documentation to get certain fields. + """ + return self.project_client.test_runs.get_multi( + query, fields, page_size=page_size, page_number=page_number + ) + + def create_test_runs(self, test_runs: list[dm.TestRun]): + """Create the given list of test runs.""" + self.project_client.test_runs.create(test_runs) + + def update_test_run(self, test_run: dm.TestRun): + """Create the given list of test runs.""" + self.project_client.test_runs.update(test_run) + + def create_test_records( + self, + test_run_id: str, + test_records: list[dm.TestRecord], + ): + """Create the given list of test records.""" + for rec in test_records: + rec.test_run_id = test_run_id + self.project_client.test_runs.records.create(test_records) + + def update_test_record(self, test_run_id: str, test_record: dm.TestRecord): + """Create the given list of test records.""" + test_record.test_run_id = test_run_id + self.project_client.test_runs.records.update(test_record) + + def get_all_work_item_attachments( + self, work_item_id: str, fields: dict[str, str] | None = None + ) -> list[dm.WorkItemAttachment]: + """Get all work item attachments for a given work item. + + Will handle pagination automatically. Define a fields dictionary + as described in the Polarion API documentation to get certain + fields. + """ + return self.project_client.work_items.attachments.get_all( + work_item_id, fields + ) + + def create_work_item_attachment( + self, work_item_attachment: dm.WorkItemAttachment + ): + """Update the given work item attachment in Polarion.""" + self.project_client.work_items.attachments.create(work_item_attachment) + + def get_all_work_items( + self, query: str = "", fields: dict[str, str] | None = None + ) -> list[WorkItemType]: + """Get all work items matching the given query. + + Will handle pagination automatically. Define a fields dictionary + as described in the Polarion API documentation to get certain + fields. + """ + return self.project_client.work_items.get_all( + query, fields, self.custom_work_item + ) + + def create_work_item(self, work_item: WorkItemType): + """Create a single given work item.""" + self.create_work_items([work_item]) + + def delete_work_items(self, work_item_ids: list[str]): + """Delete or mark the defined work items as deleted.""" + self.project_client.work_items.delete( + [dm.WorkItem(wid) for wid in work_item_ids] + ) + + def delete_work_item(self, work_item_id: str): + """Delete or mark the defined work item as deleted.""" + return self.delete_work_items([work_item_id]) + + def get_all_work_item_links( + self, + work_item_id: str, + fields: dict[str, str] | None = None, + include: str | None = None, + ) -> list[dm.WorkItemLink]: + """Get all work item links for the given work item. + + Define a fields dictionary as described in the Polarion API + documentation to get certain fields. + """ + return self.project_client.work_items.links.get_all( + work_item_id=work_item_id, + fields=fields, + include=include, + ) + + def create_work_item_links(self, work_item_links: list[dm.WorkItemLink]): + """Create the links between the work items in work_item_links.""" + self.project_client.work_items.links.create(work_item_links) + + def create_work_item_link(self, work_item_link: dm.WorkItemLink): + """Create the link between the work items in work_item_link.""" + self.create_work_item_links([work_item_link]) + + def delete_work_item_links(self, work_item_links: list[dm.WorkItemLink]): + """Delete the links between the work items in work_item_link.""" + self.project_client.work_items.links.delete(work_item_links) + + def delete_work_item_link(self, work_item_link: dm.WorkItemLink): + """Delete the links between the work items in work_item_link.""" + self.delete_work_item_links([work_item_link]) + + def get_all_test_runs( + self, + query: str = "", + fields: dict[str, str] | None = None, + ) -> list[dm.TestRun]: + """Get all test runs matching the given query. + + Will handle pagination automatically. Define a fields dictionary + as described in the Polarion API documentation to get certain + fields. + """ + return self.project_client.test_runs.get_all(query, fields) + + def get_all_test_records( + self, + test_run_id: str, + fields: dict[str, str] | None = None, + ) -> list[dm.TestRecord]: + """Get all test records matching the given query. + + Will handle pagination automatically. Define a fields dictionary + as described in the Polarion API documentation to get certain + fields. + """ + return self.project_client.test_runs.records.get_all( + test_run_id, fields + ) + + def create_test_run(self, test_run: dm.TestRun): + """Create the given test run.""" + self.create_test_runs([test_run]) + + def create_test_record(self, test_run_id: str, test_record: dm.TestRecord): + """Create the given list of test records.""" + self.create_test_records(test_run_id, [test_record]) diff --git a/tests/test_client_testrecords.py b/tests/test_client_testrecords.py index 1f0c5c4a..bc33fb9f 100644 --- a/tests/test_client_testrecords.py +++ b/tests/test_client_testrecords.py @@ -65,6 +65,7 @@ def test_create_test_records( test_run_id = "asdfg" tr_1 = polarion_api.TestRecord( + test_run_id, "MyProjectId", "MyWorkItemId", "0", @@ -74,6 +75,7 @@ def test_create_test_records( comment=polarion_api.TextContent("text/html", "My text value"), ) tr_2 = polarion_api.TestRecord( + test_run_id, "MyProjectId", "MyWorkItemId", "1234", @@ -111,6 +113,7 @@ def test_update_test_record( work_item_project = "MyProjectId" tr_1 = polarion_api.TestRecord( + test_run_id, work_item_project, work_item_id, iteration=4, diff --git a/tests/test_client_workitemlinks.py b/tests/test_client_workitemlinks.py index 2d52734d..e92239e6 100644 --- a/tests/test_client_workitemlinks.py +++ b/tests/test_client_workitemlinks.py @@ -126,7 +126,9 @@ def test_delete_work_item_links( assert req is not None and req.method == "DELETE" with open(TEST_WIL_DELETE_REQUEST, encoding="utf8") as f: - assert json.loads(req.content.decode()) == json.load(f) + expected_request = json.load(f) + + assert json.loads(req.content.decode()) == expected_request def test_delete_work_item_links_multi_primary( @@ -152,9 +154,13 @@ def test_delete_work_item_links_multi_primary( assert reqs[0].method == "DELETE" assert reqs[1].method == "DELETE" with open(TEST_WIL_DELETED_REQUEST, encoding="utf8") as f: - assert json.loads(reqs[0].content.decode()) == json.load(f) + expected_request = json.load(f) + + assert json.loads(reqs[0].content.decode()) == expected_request with open(TEST_WIL_DELETE2_REQUEST, encoding="utf8") as f: - assert json.loads(reqs[1].content.decode()) == json.load(f) + expected_request = json.load(f) + + assert json.loads(reqs[1].content.decode()) == expected_request def test_create_work_item_link( @@ -272,7 +278,11 @@ def test_get_work_item_links_error_first_request( assert len(caplog.record_tuples) == 1 _, level, message = caplog.record_tuples[0] assert level == 30 - assert message == "Received error response code 502 with content b'Test'." + assert ( + message == "Will retry after failing on first attempt, due to " + "the following error " + "(, b'Test')" + ) assert len(reqs) == 2 assert work_item_links[0] == polarion_api.WorkItemLink( "MyWorkItemId", "MyWorkItemId2", "relates_to", True, "MyProjectId" diff --git a/tests/test_client_workitems.py b/tests/test_client_workitems.py index 9b4b1430..974e73eb 100644 --- a/tests/test_client_workitems.py +++ b/tests/test_client_workitems.py @@ -122,7 +122,9 @@ def test_get_all_work_items_single_page( with open(TEST_WI_NO_NEXT_PAGE_RESPONSE, encoding="utf8") as f: httpx_mock.add_response(json=json.load(f)) - client.default_fields.workitems = "@basic,description" + client.project_client._client.default_fields.workitems = ( + "@basic,description" + ) work_items = client.get_all_work_items("") @@ -205,7 +207,7 @@ def test_create_work_item_checksum( checksum = work_item.calculate_checksum() - client.add_work_item_checksum = True + client.project_client.work_items.add_work_item_checksum = True client.create_work_item(work_item) req = httpx_mock.get_request() @@ -345,8 +347,16 @@ def test_work_item_single_request_size( httpx_mock.add_response(201, json=mock_response) - work_item_data = client._build_work_item_post_request(work_item) - size, _ = client._calculate_post_work_item_request_sizes(work_item_data) + work_item_data = ( + client.project_client.work_items._build_work_item_post_request( + work_item + ) + ) + size, _ = ( + client.project_client.work_items._calculate_post_work_item_request_sizes( + work_item_data + ) + ) client.create_work_items([work_item]) @@ -371,13 +381,21 @@ def test_work_item_multi_request_size( ) ) - work_item_data = client._build_work_item_post_request(work_item) + work_item_data = ( + client.project_client.work_items._build_work_item_post_request( + work_item + ) + ) - size, _ = client._calculate_post_work_item_request_sizes( - work_item_data, size + size, _ = ( + client.project_client.work_items._calculate_post_work_item_request_sizes( + work_item_data, size + ) ) - size, _ = client._calculate_post_work_item_request_sizes( - work_item_data, size + size, _ = ( + client.project_client.work_items._calculate_post_work_item_request_sizes( + work_item_data, size + ) ) client.create_work_items(2 * [work_item]) @@ -451,7 +469,7 @@ def test_update_work_item_completely_checksum( spy = mocker.spy(work_item_patch, "calculate_checksum") checksum = work_item_patch.calculate_checksum() - client.add_work_item_checksum = True + client.project_client.work_items.add_work_item_checksum = True client.update_work_item(work_item_patch) req = httpx_mock.get_request() @@ -572,7 +590,7 @@ def test_delete_work_item_delete_mode( ): httpx_mock.add_response(204) - client.delete_polarion_work_items = True + client.project_client.work_items.delete_status = None client.delete_work_item("MyWorkItemId")