diff --git a/.github/workflows/build-test-publish.yml b/.github/workflows/build-test-publish.yml index 8c4bf81b..19e96791 100644 --- a/.github/workflows/build-test-publish.yml +++ b/.github/workflows/build-test-publish.yml @@ -21,9 +21,6 @@ jobs: - "3.10" - "3.11" - "3.12" - include: - - os: windows-latest - python_version: "3.12" steps: - uses: actions/checkout@v3 - name: Set up Python ${{matrix.python_version}} diff --git a/README.md b/README.md index c25998f6..6d4b6d22 100644 --- a/README.md +++ b/README.md @@ -16,6 +16,8 @@ Synchronise Capella models with Polarion projects Read the [full documentation on GitLab pages](https://dsd-dbs.github.io/capella-polarion). # Installation +We have a dependency on [cairosvg](https://cairosvg.org/). Please check their +[documentation](https://cairosvg.org/documentation/) for OS specific dependencies. You can install the latest released version directly from PyPI (**Not yet**). diff --git a/capella2polarion/__main__.py b/capella2polarion/__main__.py index 8c653c53..8a2ce637 100644 --- a/capella2polarion/__main__.py +++ b/capella2polarion/__main__.py @@ -4,7 +4,6 @@ from __future__ import annotations import logging -import pathlib import typing import capellambse @@ -35,18 +34,6 @@ ) @click.option("--polarion-pat", envvar="POLARION_PAT", type=str) @click.option("--polarion-delete-work-items", is_flag=True, default=False) -@click.option( - "--capella-diagram-cache-folder-path", - type=click.Path( - exists=True, - file_okay=False, - dir_okay=True, - readable=True, - resolve_path=True, - path_type=pathlib.Path, - ), - default=None, -) @click.option("--capella-model", type=cli_helpers.ModelCLI(), default=None) @click.option( "--synchronize-config", @@ -62,18 +49,19 @@ def cli( polarion_url: str, polarion_pat: str, polarion_delete_work_items: bool, - capella_diagram_cache_folder_path: pathlib.Path, capella_model: capellambse.MelodyModel, synchronize_config: typing.TextIO, ) -> None: """Synchronise data from Capella to Polarion.""" + if capella_model.diagram_cache is None: + logger.warning("It's highly recommended to define a diagram cache!") + capella2polarion_cli = Capella2PolarionCli( debug, polarion_project_id, polarion_url, polarion_pat, polarion_delete_work_items, - capella_diagram_cache_folder_path, capella_model, synchronize_config, force_update, @@ -96,28 +84,17 @@ def synchronize(ctx: click.core.Context) -> None: """Synchronise model elements.""" capella_to_polarion_cli: Capella2PolarionCli = ctx.obj logger.info( - "Synchronising diagrams from diagram cache at " - "%s to Polarion project with id %s...", - str(capella_to_polarion_cli.capella_diagram_cache_folder_path), + "Synchronising model elements to Polarion project with id %s...", capella_to_polarion_cli.polarion_params.project_id, ) capella_to_polarion_cli.load_synchronize_config() - capella_to_polarion_cli.load_capella_diagram_cache_index() - - assert ( - capella_to_polarion_cli.capella_diagram_cache_index_content is not None - ) converter = model_converter.ModelConverter( capella_to_polarion_cli.capella_model, - capella_to_polarion_cli.capella_diagram_cache_folder_path, capella_to_polarion_cli.polarion_params.project_id, ) - converter.read_model( - capella_to_polarion_cli.config, - capella_to_polarion_cli.capella_diagram_cache_index_content, - ) + converter.read_model(capella_to_polarion_cli.config) polarion_worker = pw.CapellaPolarionWorker( capella_to_polarion_cli.polarion_params, @@ -133,7 +110,11 @@ def synchronize(ctx: click.core.Context) -> None: polarion_worker.post_work_items(converter.converter_session) # Create missing links for new work items - converter.generate_work_items(polarion_worker.polarion_data_repo, True) + converter.generate_work_items( + polarion_worker.polarion_data_repo, + generate_links=True, + generate_attachments=True, + ) polarion_worker.patch_work_items(converter.converter_session) diff --git a/capella2polarion/cli.py b/capella2polarion/cli.py index c38c2bdb..9de51635 100644 --- a/capella2polarion/cli.py +++ b/capella2polarion/cli.py @@ -3,7 +3,6 @@ """Tool for CLI work.""" from __future__ import annotations -import json import logging import pathlib import typing @@ -27,7 +26,6 @@ def __init__( polarion_url: str, polarion_pat: str, polarion_delete_work_items: bool, - capella_diagram_cache_folder_path: pathlib.Path | None, capella_model: capellambse.MelodyModel, synchronize_config_io: typing.TextIO, force_update: bool = False, @@ -39,22 +37,9 @@ def __init__( polarion_pat, polarion_delete_work_items, ) - if capella_diagram_cache_folder_path is None: - raise ValueError("CapellaDiagramCacheFolderPath not filled") - self.capella_diagram_cache_folder_path = ( - capella_diagram_cache_folder_path - ) - self.capella_diagram_cache_index_file_path = ( - self.capella_diagram_cache_folder_path / "index.json" - ) - self.capella_diagram_cache_index_content: list[ - dict[str, typing.Any] - ] = [] self.capella_model: capellambse.MelodyModel = capella_model self.synchronize_config_io: typing.TextIO = synchronize_config_io - self.synchronize_config_content: dict[str, typing.Any] = {} - self.synchronize_config_roles: dict[str, list[str]] | None = None self.config = converter_config.ConverterConfig() self.force_update = force_update @@ -99,10 +84,6 @@ def _value(value): string_value = self._none_save_value_string(string_value) click.echo(f"{lighted_member_var}: '{string_value}'") - echo = ("NO", "YES")[ - self.capella_diagram_cache_index_file_path.is_file() - ] - click.echo(f"""Capella Diagram Cache Index-File exists: {echo}""") echo = ("YES", "NO")[self.synchronize_config_io.closed] click.echo(f"""Synchronize Config-IO is open: {echo}""") @@ -133,15 +114,3 @@ def load_synchronize_config(self) -> None: if not self.synchronize_config_io.readable(): raise RuntimeError("synchronize config io stream is not readable") self.config.read_config_file(self.synchronize_config_io) - - def load_capella_diagram_cache_index(self) -> None: - """Load Capella Diagram Cache index file content.""" - if not self.capella_diagram_cache_index_file_path.is_file(): - raise ValueError( - "capella diagramm cache index.json file does not exist" - ) - - l_text_content = self.capella_diagram_cache_index_file_path.read_text( - encoding="utf8" - ) - self.capella_diagram_cache_index_content = json.loads(l_text_content) diff --git a/capella2polarion/connectors/polarion_worker.py b/capella2polarion/connectors/polarion_worker.py index c5c97de8..82618665 100644 --- a/capella2polarion/connectors/polarion_worker.py +++ b/capella2polarion/connectors/polarion_worker.py @@ -4,11 +4,14 @@ from __future__ import annotations import collections.abc as cabc +import json import logging import typing as t from urllib import parse import polarion_rest_api_client as polarion_api +from capellambse import helpers as chelpers +from lxml import etree from capella2polarion import data_models from capella2polarion.connectors import polarion_repo @@ -103,8 +106,8 @@ def delete_work_items( ) -> None: """Delete work items in a Polarion project. - If the delete flag is set to ``False`` in the context work items are - marked as ``to be deleted`` via the status attribute. + If the delete flag is set to ``False`` in the context work items + are marked as ``to be deleted`` via the status attribute. """ def serialize_for_delete(uuid: str) -> str: @@ -160,60 +163,116 @@ def patch_work_item( """Patch a given WorkItem.""" new = converter_session[uuid].work_item _, old = self.polarion_data_repo[uuid] - if not self.force_update and new == old: - return assert old is not None assert new is not None - log_args = (old.id, new.type, new.title) - logger.info("Update work item %r for model %s %r...", *log_args) + new.calculate_checksum() + if not self.force_update and new == old: + return - del new.additional_attributes["uuid_capella"] + log_args = (old.id, new.type, new.title) + logger.info( + "Update work item %r for model element %s %r...", *log_args + ) - old = self.client.get_work_item(old.id) + if old.get_current_checksum()[0] != "{": # XXX: Remove in next release + old_checksums = {"__C2P__WORK_ITEM": old.get_current_checksum()} + else: + old_checksums = json.loads(old.get_current_checksum()) - if old.linked_work_items_truncated: - old.linked_work_items = self.client.get_all_work_item_links(old.id) + new_checksums = json.loads(new.get_current_checksum()) - del old.additional_attributes["uuid_capella"] + new_work_item_check_sum = new_checksums.pop("__C2P__WORK_ITEM") + old_work_item_check_sum = old_checksums.pop("__C2P__WORK_ITEM") - # Type will only be updated, if it is set and should be used carefully - if new.type == old.type: - new.type = None - new.status = "open" - - # If additional fields were present in the past, but aren't anymore, - # we have to set them to an empty value manually - defaults = DEFAULT_ATTRIBUTE_VALUES - for attribute, value in old.additional_attributes.items(): - if attribute not in new.additional_attributes: - new.additional_attributes[attribute] = defaults.get( - type(value) + work_item_changed = new_work_item_check_sum != old_work_item_check_sum + try: + if work_item_changed or self.force_update: + old = self.client.get_work_item(old.id) + if old.attachments: + old_attachments = ( + self.client.get_all_work_item_attachments( + work_item_id=old.id + ) + ) + else: + old_attachments = [] + else: + old_attachments = self.client.get_all_work_item_attachments( + work_item_id=old.id + ) + if old_attachments or new.attachments: + self.update_attachments( + new, old_checksums, new_checksums, old_attachments ) - elif new.additional_attributes[attribute] == value: - del new.additional_attributes[attribute] + except polarion_api.PolarionApiException as error: + logger.error( + "Updating attachments for WorkItem %r (%s %s) failed. %s", + *log_args, + error.args[0], + ) + return assert new.id is not None + delete_links = None + create_links = None + + if work_item_changed or self.force_update: + if new.attachments: + self._refactor_attached_images(new) + + del new.additional_attributes["uuid_capella"] + del old.additional_attributes["uuid_capella"] + + if old.linked_work_items_truncated: + old.linked_work_items = self.client.get_all_work_item_links( + old.id + ) + + # Type will only be updated, if set and should be used carefully + if new.type == old.type: + new.type = None + new.status = "open" + + # If additional fields were present, but aren't anymore, + # we have to set them to an empty value manually + defaults = DEFAULT_ATTRIBUTE_VALUES + for attribute, value in old.additional_attributes.items(): + if attribute not in new.additional_attributes: + new.additional_attributes[attribute] = defaults.get( + type(value) + ) + elif new.additional_attributes[attribute] == value: + del new.additional_attributes[attribute] + + delete_links = CapellaPolarionWorker.get_missing_link_ids( + old.linked_work_items, new.linked_work_items + ) + create_links = CapellaPolarionWorker.get_missing_link_ids( + new.linked_work_items, old.linked_work_items + ) + else: + new.additional_attributes = {} + new.type = None + new.status = None + new.description = None + new.description_type = None + new.title = None + try: self.client.update_work_item(new) - if delete_link_ids := CapellaPolarionWorker.get_missing_link_ids( - old.linked_work_items, new.linked_work_items - ): - id_list_str = ", ".join(delete_link_ids.keys()) + if delete_links: + id_list_str = ", ".join(delete_links.keys()) logger.info( "Delete work item links %r for model %s %r", id_list_str, new.type, new.title, ) - self.client.delete_work_item_links( - list(delete_link_ids.values()) - ) + self.client.delete_work_item_links(list(delete_links.values())) - if create_links := CapellaPolarionWorker.get_missing_link_ids( - new.linked_work_items, old.linked_work_items - ): + if create_links: id_list_str = ", ".join(create_links.keys()) logger.info( "Create work item links %r for model %s %r", @@ -230,6 +289,106 @@ def patch_work_item( error.args[0], ) + def _refactor_attached_images(self, new: data_models.CapellaWorkItem): + def set_attachment_id(node: etree._Element) -> None: + if node.tag != "img": + return + if img_src := node.attrib.get("src"): + if img_src.startswith("workitemimg:"): + file_name = img_src[12:] + for attachment in new.attachments: + if attachment.file_name == file_name: + node.attrib["src"] = f"workitemimg:{attachment.id}" + return + + logger.error( + "Did not find attachment ID for file name %s", + file_name, + ) + + if new.description: + new.description = chelpers.process_html_fragments( + new.description, set_attachment_id + ) + for _, attributes in new.additional_attributes.items(): + if ( + isinstance(attributes, dict) + and attributes.get("type") == "text/html" + and attributes.get("value") is not None + ): + attributes["value"] = chelpers.process_html_fragments( + attributes["value"], set_attachment_id + ) + + def update_attachments( + self, + new: data_models.CapellaWorkItem, + old_checksums: dict[str, str], + new_checksums: dict[str, str], + old_attachments: list[polarion_api.WorkItemAttachment], + ): + """Delete, create and update attachments in one go. + + After execution all attachments of the new work item should have + IDs. + """ + new_attachment_dict = { + attachment.file_name: attachment for attachment in new.attachments + } + old_attachment_dict = { + attachment.file_name: attachment for attachment in old_attachments + } + + for attachment in old_attachments: + if attachment not in old_attachment_dict.values(): + logger.error( + "There are already multiple attachments named %s. " + "Attachment with ID %s will be deleted for that reason" + " - please report this as issue.", + attachment.file_name, + attachment.id, + ) + self.client.delete_work_item_attachment(attachment) + + old_attachment_file_names = set(old_attachment_dict) + new_attachment_file_names = set(new_attachment_dict) + for file_name in old_attachment_file_names - new_attachment_file_names: + self.client.delete_work_item_attachment( + old_attachment_dict[file_name] + ) + + if new_attachments := list( + map( + new_attachment_dict.get, + new_attachment_file_names - old_attachment_file_names, + ) + ): + self.client.create_work_item_attachments(new_attachments) + + attachments_for_update = {} + for common_attachment_file_name in ( + old_attachment_file_names & new_attachment_file_names + ): + attachment = new_attachment_dict[common_attachment_file_name] + attachment.id = old_attachment_dict[common_attachment_file_name].id + if ( + new_checksums.get(attachment.file_name) + != old_checksums.get(attachment.file_name) + or self.force_update + or attachment.mime_type == "image/svg+xml" + ): + attachments_for_update[attachment.file_name] = attachment + + for file_name, attachment in attachments_for_update.items(): + # SVGs should only be updated if their PNG differs + if ( + attachment.mime_type == "image/svg+xml" + and file_name[:-3] + "png" not in attachments_for_update + ): + continue + + self.client.update_work_item_attachment(attachment) + @staticmethod def get_missing_link_ids( left: cabc.Iterable[polarion_api.WorkItemLink], diff --git a/capella2polarion/converters/converter_config.py b/capella2polarion/converters/converter_config.py index 9308cd06..88e20ed0 100644 --- a/capella2polarion/converters/converter_config.py +++ b/capella2polarion/converters/converter_config.py @@ -24,7 +24,7 @@ class CapellaTypeConfig: def __post_init__(self): """Post processing for the initialization.""" - self.converters = _force_list(self.converters) or ["generic_work_item"] + self.converters = _force_list(self.converters) def _default_type_conversion(c_type: str) -> str: diff --git a/capella2polarion/converters/element_converter.py b/capella2polarion/converters/element_converter.py index ec3fb9da..50bf53cc 100644 --- a/capella2polarion/converters/element_converter.py +++ b/capella2polarion/converters/element_converter.py @@ -3,8 +3,8 @@ """Objects for serialization of capella objects to workitems.""" from __future__ import annotations -import base64 import collections +import hashlib import logging import mimetypes import pathlib @@ -12,8 +12,10 @@ import typing as t from collections import abc as cabc +import cairosvg import capellambse import markupsafe +import polarion_rest_api_client as polarion_api from capellambse import helpers as chelpers from capellambse.model import common from capellambse.model.crosslayer import interaction @@ -31,7 +33,7 @@ f"" ) RE_CAMEL_CASE_2ND_WORD_PATTERN = re.compile(r"([a-z]+)([A-Z][a-z]+)") -DIAGRAM_STYLES = {"max-width": "100%"} + POLARION_WORK_ITEM_URL = ( '' @@ -43,6 +45,7 @@ ] logger = logging.getLogger(__name__) +C2P_IMAGE_PREFIX = "__C2P__" def resolve_element_type(type_: str) -> str: @@ -57,22 +60,6 @@ def strike_through(string: str) -> str: return f'{string}' -def _decode_diagram(diagram_path: pathlib.Path) -> str: - mime_type, _ = mimetypes.guess_type(diagram_path) - if mime_type is None: - logger.error( - "Do not understand the MIME subtype for the diagram '%s'!", - diagram_path, - ) - return "" - content = diagram_path.read_bytes() - content_encoded = base64.standard_b64encode(content) - assert mime_type is not None - image_data = b"data:" + mime_type.encode() + b";base64," + content_encoded - src = image_data.decode() - return src - - def _format_texts( type_texts: dict[str, list[str]] ) -> dict[str, dict[str, str]]: @@ -119,12 +106,15 @@ def _condition( return {"type": _type, "value": value} -def _generate_image_html(src: str) -> str: +def _generate_image_html( + title: str, attachment_id: str, max_width: int, cls: str +) -> str: """Generate an image as HTMl with the given source.""" - style = "; ".join( - (f"{key}: {value}" for key, value in DIAGRAM_STYLES.items()) + description = ( + f'' ) - description = f'

' return description @@ -136,15 +126,15 @@ class CapellaWorkItemSerializer: def __init__( self, - diagram_cache_path: pathlib.Path, model: capellambse.MelodyModel, capella_polarion_mapping: polarion_repo.PolarionDataRepository, converter_session: data_session.ConverterSession, + generate_attachments: bool, ): - self.diagram_cache_path = diagram_cache_path self.model = model self.capella_polarion_mapping = capella_polarion_mapping self.converter_session = converter_session + self.generate_attachments = generate_attachments def serialize_all(self) -> list[data_models.CapellaWorkItem]: """Serialize all items of the converter_session.""" @@ -154,7 +144,12 @@ def serialize_all(self) -> list[data_models.CapellaWorkItem]: def serialize(self, uuid: str) -> data_models.CapellaWorkItem | None: """Return a CapellaWorkItem for the given diagram or element.""" converter_data = self.converter_session[uuid] - self._generic_work_item(converter_data) + work_item_id = None + if old := self.capella_polarion_mapping.get_work_item_by_capella_uuid( + uuid + ): + work_item_id = old.id + self.__generic_work_item(converter_data, work_item_id) for converter in converter_data.type_config.converters or []: try: @@ -169,40 +164,120 @@ def serialize(self, uuid: str) -> data_models.CapellaWorkItem | None: converter_data.work_item = None return None # Force to not overwrite on failure assert converter_data.work_item is not None - old = self.capella_polarion_mapping.get_work_item_by_capella_uuid(uuid) - if old: - converter_data.work_item.id = old.id + return converter_data.work_item + def _add_attachment( + self, + work_item: data_models.CapellaWorkItem, + attachment: polarion_api.WorkItemAttachment, + ): + attachment.work_item_id = work_item.id or "" + work_item.attachments.append(attachment) + if attachment.mime_type == "image/svg+xml": + work_item.attachments.append( + polarion_api.WorkItemAttachment( + attachment.work_item_id, + "", + attachment.title, + cairosvg.svg2png(attachment.content_bytes), + "image/png", + attachment.file_name[:-3] + "png", + ) + ) + + def _draw_diagram_svg( + self, + diagram: capellambse.model.diagram.Diagram, + file_name: str, + title: str, + max_width: int, + cls: str, + ) -> tuple[str, polarion_api.WorkItemAttachment | None]: + file_name = f"{C2P_IMAGE_PREFIX}{file_name}.svg" + + if self.generate_attachments: + try: + diagram_svg = diagram.render("svg") + except Exception as error: + logger.exception( + "Failed to get diagram from cache. Error: %s", error + ) + diagram_svg = diagram.as_svg + + if isinstance(diagram_svg, str): + diagram_svg = diagram_svg.encode("utf8") + + attachment = polarion_api.WorkItemAttachment( + "", + "", + title, + diagram_svg, + "image/svg+xml", + file_name, + ) + else: + attachment = None + + return ( + _generate_image_html(title, file_name, max_width, cls), + attachment, + ) + + def _draw_additional_attributes_diagram( + self, + work_item: data_models.CapellaWorkItem, + diagram: capellambse.model.diagram.Diagram, + attribute: str, + title: str, + ): + diagram_html, attachment = self._draw_diagram_svg( + diagram, attribute, title, 650, "additional-attributes-diagram" + ) + if attachment: + self._add_attachment(work_item, attachment) + work_item.additional_attributes[attribute] = { + "type": "text/html", + "value": diagram_html, + } + def _diagram( self, converter_data: data_session.ConverterData ) -> data_models.CapellaWorkItem: """Serialize a diagram for Polarion.""" - diag = converter_data.capella_element - diagram_path = self.diagram_cache_path / f"{diag.uuid}.svg" - src = _decode_diagram(diagram_path) - description = _generate_image_html(src) + diagram = converter_data.capella_element + assert converter_data.work_item is not None + work_item_id = converter_data.work_item.id + + diagram_html, attachment = self._draw_diagram_svg( + diagram, "diagram", "Diagram", 750, "diagram" + ) + converter_data.work_item = data_models.CapellaWorkItem( + id=work_item_id, type=converter_data.type_config.p_type, - title=diag.name, + title=diagram.name, description_type="text/html", - description=description, + description=diagram_html, status="open", - uuid_capella=diag.uuid, + uuid_capella=diagram.uuid, ) + if attachment: + self._add_attachment(converter_data.work_item, attachment) return converter_data.work_item - def _generic_work_item( - self, converter_data: data_session.ConverterData + def __generic_work_item( + self, converter_data: data_session.ConverterData, work_item_id ) -> data_models.CapellaWorkItem: obj = converter_data.capella_element raw_description = getattr(obj, "description", None) - uuids, value = self._sanitize_description( + uuids, value, attachments = self._sanitize_description( obj, raw_description or markupsafe.Markup("") ) converter_data.description_references = uuids requirement_types = _get_requirement_types_text(obj) converter_data.work_item = data_models.CapellaWorkItem( + id=work_item_id, type=converter_data.type_config.p_type, title=obj.name, description_type="text/html", @@ -211,19 +286,26 @@ def _generic_work_item( uuid_capella=obj.uuid, **requirement_types, ) + for attachment in attachments: + self._add_attachment(converter_data.work_item, attachment) + return converter_data.work_item def _sanitize_description( self, obj: common.GenericElement, descr: markupsafe.Markup - ) -> tuple[list[str], markupsafe.Markup]: + ) -> tuple[ + list[str], markupsafe.Markup, list[polarion_api.WorkItemAttachment] + ]: referenced_uuids: list[str] = [] replaced_markup = RE_DESCR_LINK_PATTERN.sub( lambda match: self._replace_markup(match, referenced_uuids, 2), descr, ) + attachments: list[polarion_api.WorkItemAttachment] = [] + def repair_images(node: etree._Element) -> None: - if node.tag != "img": + if node.tag != "img" or not self.generate_attachments: return file_url = pathlib.PurePosixPath(node.get("src")) @@ -236,8 +318,22 @@ def repair_images(node: etree._Element) -> None: ] try: with filehandler.open(file_path, "r") as img: - b64_img = base64.b64encode(img.read()).decode("utf8") - node.attrib["src"] = f"data:{mime_type};base64,{b64_img}" + content = img.read() + file_name = hashlib.md5( + str(file_path).encode("utf8") + ).hexdigest() + attachments.append( + polarion_api.WorkItemAttachment( + "", + "", + file_path.name, + content, + mime_type, + f"{file_name}.{file_path.suffix}", + ) + ) + node.attrib["src"] = f"workitemimg:{file_name}" + except FileNotFoundError: logger.error( "Inline image can't be found from %r for %r", @@ -248,7 +344,7 @@ def repair_images(node: etree._Element) -> None: repaired_markup = chelpers.process_html_fragments( replaced_markup, repair_images ) - return referenced_uuids, repaired_markup + return referenced_uuids, repaired_markup, attachments def _replace_markup( self, @@ -305,17 +401,19 @@ def matcher(match: re.Match) -> str: def _get_linked_text( self, converter_data: data_session.ConverterData - ) -> markupsafe.Markup: + ) -> tuple[markupsafe.Markup, list[polarion_api.WorkItemAttachment]]: """Return sanitized markup of the given ``obj`` linked text.""" obj = converter_data.capella_element default = {"capella:linkedText": markupsafe.Markup("")} description = getattr(obj, "specification", default)[ "capella:linkedText" ].striptags() - uuids, value = self._sanitize_description(obj, description) + uuids, value, attachments = self._sanitize_description( + obj, description + ) if uuids: converter_data.description_references = uuids - return value + return value, attachments def _linked_text_as_description( self, converter_data: data_session.ConverterData @@ -323,9 +421,11 @@ def _linked_text_as_description( """Return attributes for a ``Constraint``.""" # pylint: disable-next=attribute-defined-outside-init assert converter_data.work_item, "No work item set yet" - converter_data.work_item.description = self._get_linked_text( - converter_data - ) + ( + converter_data.work_item.description, + attachments, + ) = self._get_linked_text(converter_data) + converter_data.work_item.attachments += attachments return converter_data.work_item def _add_context_diagram( @@ -334,10 +434,14 @@ def _add_context_diagram( """Add a new custom field context diagram.""" assert converter_data.work_item, "No work item set yet" diagram = converter_data.capella_element.context_diagram - converter_data.work_item.additional_attributes["context_diagram"] = { - "type": "text/html", - "value": _generate_image_html(diagram.as_datauri_svg), - } + + self._draw_additional_attributes_diagram( + converter_data.work_item, + diagram, + "context_diagram", + "Context Diagram", + ) + return converter_data.work_item def _add_tree_diagram( @@ -346,8 +450,9 @@ def _add_tree_diagram( """Add a new custom field tree diagram.""" assert converter_data.work_item, "No work item set yet" diagram = converter_data.capella_element.tree_view - converter_data.work_item.additional_attributes["tree_view"] = { - "type": "text/html", - "value": _generate_image_html(diagram.as_datauri_svg), - } + + self._draw_additional_attributes_diagram( + converter_data.work_item, diagram, "tree_view", "Tree View" + ) + return converter_data.work_item diff --git a/capella2polarion/converters/link_converter.py b/capella2polarion/converters/link_converter.py index 89e1b745..8ac5de53 100644 --- a/capella2polarion/converters/link_converter.py +++ b/capella2polarion/converters/link_converter.py @@ -136,9 +136,12 @@ def _handle_diagram_reference_links( refs = set(self._collect_uuids(obj.nodes)) refs = set(self._get_work_item_ids(work_item_id, refs, role_id)) ref_links = self._create(work_item_id, role_id, refs, links) - except StopIteration: + except Exception as err: logger.exception( - "Could not create links for diagram %r", obj._short_repr_() + "Could not create links for diagram %r, " + "because an error occured %s", + obj._short_repr_(), + err, ) ref_links = [] return ref_links diff --git a/capella2polarion/converters/model_converter.py b/capella2polarion/converters/model_converter.py index 66bafd24..a1548f90 100644 --- a/capella2polarion/converters/model_converter.py +++ b/capella2polarion/converters/model_converter.py @@ -5,7 +5,6 @@ from __future__ import annotations import logging -import pathlib import typing as t import capellambse @@ -29,18 +28,15 @@ class ModelConverter: def __init__( self, model: capellambse.MelodyModel, - diagram_cache_path: pathlib.Path, project_id: str, ): self.model = model - self.diagram_cache_path = diagram_cache_path self.project_id = project_id self.converter_session: data_session.ConverterSession = {} def read_model( self, config: converter_config.ConverterConfig, - diagram_idx: list[dict[str, t.Any]], ): """Read the model using a given config and diagram_idx.""" missing_types: set[tuple[str, str, dict[str, t.Any]]] = set() @@ -65,16 +61,10 @@ def read_model( missing_types.add((layer, c_type, attributes)) if config.diagram_config: - diagrams_from_cache = { - d["uuid"] for d in diagram_idx if d["success"] - } for d in self.model.diagrams: - if d.uuid in diagrams_from_cache: - self.converter_session[ - d.uuid - ] = data_session.ConverterData( - "", config.diagram_config, d - ) + self.converter_session[d.uuid] = data_session.ConverterData( + "", config.diagram_config, d + ) if missing_types: for missing_type in missing_types: @@ -91,18 +81,30 @@ def generate_work_items( self, polarion_data_repo: polarion_repo.PolarionDataRepository, generate_links: bool = False, + generate_attachments: bool = False, ) -> dict[str, data_models.CapellaWorkItem]: """Return a work items mapping from model elements for Polarion. The dictionary maps Capella UUIDs to ``CapellaWorkItem`` s. In addition, it is ensured that neither title nor type are None, Links are not created in this step by default. + + Parameters + ---------- + polarion_data_repo + The PolarionDataRepository object storing current work item + data. + generate_links + A boolean flag to control linked work item generation. + generate_attachments + A boolean flag to control attachments generation. For SVG + attachments, PNGs are generated and attached automatically. """ serializer = element_converter.CapellaWorkItemSerializer( - self.diagram_cache_path, self.model, polarion_data_repo, self.converter_session, + generate_attachments, ) work_items = serializer.serialize_all() for work_item in work_items: diff --git a/capella2polarion/data_models.py b/capella2polarion/data_models.py index 2f750e7e..55f8ab3a 100644 --- a/capella2polarion/data_models.py +++ b/capella2polarion/data_models.py @@ -3,6 +3,9 @@ """Module providing the CapellaWorkItem class.""" from __future__ import annotations +import base64 +import hashlib +import json import typing as t import polarion_rest_api_client as polarion_api @@ -20,3 +23,40 @@ class Condition(t.TypedDict): uuid_capella: str preCondition: Condition | None postCondition: Condition | None + + def calculate_checksum(self) -> str: + """Calculate and return a checksum for this WorkItem. + + In addition, the checksum will be written to self._checksum. + """ + data = self.to_dict() + del data["checksum"] + del data["id"] + + attachments = data.pop("attachments") + attachment_checksums = {} + for attachment in attachments: + # Don't store checksums for SVGs as we can check their PNGs instead + if attachment["mime_type"] == "image/svg+xml": + continue + try: + attachment["content_bytes"] = base64.b64encode( + attachment["content_bytes"] + ).decode("utf8") + except TypeError: + pass + + del attachment["id"] + attachment_checksums[attachment["file_name"]] = hashlib.sha256( + json.dumps(attachment).encode("utf8") + ).hexdigest() + + data = dict(sorted(data.items())) + + converted = json.dumps(data).encode("utf8") + # pylint: disable-next=attribute-defined-outside-init + self._checksum = json.dumps( + {"__C2P__WORK_ITEM": hashlib.sha256(converted).hexdigest()} + | dict(sorted(attachment_checksums.items())) + ) + return self._checksum diff --git a/ci-templates/gitlab/synchronise_elements.yml b/ci-templates/gitlab/synchronise_elements.yml index 94d4a0fe..dfcae1ea 100644 --- a/ci-templates/gitlab/synchronise_elements.yml +++ b/ci-templates/gitlab/synchronise_elements.yml @@ -18,6 +18,5 @@ capella2polarion_synchronise_elements: $([[ $CAPELLA2POLARION_FORCE_UPDATE -eq 1 ]] && echo '--force-update') \ --polarion-project-id=${CAPELLA2POLARION_PROJECT_ID:?} \ --capella-model="${CAPELLA2POLARION_MODEL_JSON:?}" \ - --capella-diagram-cache-folder-path=./diagram_cache \ --synchronize-config=${CAPELLA2POLARION_CONFIG:?} \ synchronize diff --git a/docs/source/pipeline templates/gitlab.rst b/docs/source/pipeline templates/gitlab.rst index 060787de..fb9f6b21 100644 --- a/docs/source/pipeline templates/gitlab.rst +++ b/docs/source/pipeline templates/gitlab.rst @@ -12,7 +12,9 @@ following template can be used inside the `.gitlab-ci.yml` file: :language: yaml :lines: 4- -A `.gitlab-ci.yml` with a capella2polarion synchronization job could look like +We highly recommend using the diagram cache as a separate job and defined it as a dependency in our template for that +reason. The diagram cache artifacts have to be included in the capella2polarion job and its path must be defined in the +`CAPELLA2POLARION_MODEL_JSON` variable. A `.gitlab-ci.yml` with a capella2polarion synchronization job could look like this: .. code:: yaml @@ -33,6 +35,6 @@ this: CAPELLA_VERSION: 6.1.0 ENTRYPOINT: model.aird CAPELLA2POLARION_PROJECT_ID: syncproj - CAPELLA2POLARION_MODEL_JSON: '{"path": "PATH_TO_CAPELLA"}' + CAPELLA2POLARION_MODEL_JSON: '{"path": "PATH_TO_CAPELLA", "diagram_cache": "./diagram_cache"}' CAPELLA2POLARION_CONFIG: capella2polarion_config.yaml CAPELLA2POLARION_DEBUG: 1 diff --git a/pyproject.toml b/pyproject.toml index dbbcceb6..ede3b58d 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -33,7 +33,8 @@ dependencies = [ "click", "PyYAML", "polarion-rest-api-client==0.3.0", - "bidict" + "bidict", + "cairosvg", ] [project.urls] diff --git a/tests/conftest.py b/tests/conftest.py index cbf1ea89..76ce35de 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -18,7 +18,10 @@ TEST_DIAGRAM_CACHE = TEST_DATA_ROOT / "diagram_cache" TEST_MODEL_ELEMENTS = TEST_DATA_ROOT / "model_elements" TEST_MODEL_ELEMENTS_CONFIG = TEST_MODEL_ELEMENTS / "config.yaml" -TEST_MODEL = TEST_DATA_ROOT / "model" / "Melody Model Test.aird" +TEST_MODEL = { + "path": str(TEST_DATA_ROOT / "model" / "Melody Model Test.aird"), + "diagram_cache": str(TEST_DIAGRAM_CACHE), +} TEST_HOST = "https://api.example.com" @@ -32,7 +35,7 @@ def diagram_cache_index() -> list[dict[str, t.Any]]: @pytest.fixture def model() -> capellambse.MelodyModel: """Return the test model.""" - return capellambse.MelodyModel(path=TEST_MODEL) + return capellambse.MelodyModel(**TEST_MODEL) @pytest.fixture diff --git a/tests/test_cli.py b/tests/test_cli.py index 5fbadd92..77777f6c 100644 --- a/tests/test_cli.py +++ b/tests/test_cli.py @@ -3,6 +3,7 @@ from __future__ import annotations +import json from unittest import mock import polarion_rest_api_client as polarion_api @@ -50,10 +51,8 @@ def test_migrate_model_elements(monkeypatch: pytest.MonkeyPatch): "--polarion-pat", "AlexandersPrivateAcessToken", "--polarion-delete-work-items", - "--capella-diagram-cache-folder-path", - str(TEST_DIAGRAM_CACHE), "--capella-model", - str(TEST_MODEL), + json.dumps(TEST_MODEL), "--synchronize-config", str(TEST_MODEL_ELEMENTS_CONFIG), "synchronize", diff --git a/tests/test_elements.py b/tests/test_elements.py index 6afb4f3a..abacb4e4 100644 --- a/tests/test_elements.py +++ b/tests/test_elements.py @@ -4,7 +4,6 @@ from __future__ import annotations import logging -import pathlib import typing as t from unittest import mock @@ -55,7 +54,9 @@ TEST_CAP_REAL = "b80b3141-a7fc-48c7-84b2-1467dcef5fce" TEST_CONSTRAINT = "95cbd4af-7224-43fe-98cb-f13dda540b8e" TEST_DIAG_DESCR = ( - '

' ) TEST_SER_DIAGRAM: dict[str, t.Any] = { "id": "Diag-1", @@ -68,7 +69,8 @@ }, } TEST_WI_CHECKSUM = ( - "d7916c4c529d588dcfdfa30c78a04dcf5b50089440a767ca962e24b94fb65c5d" + '{"__C2P__WORK_ITEM": ' + '"be783ea9b9144856394222dde865ebc925f31e497e8aabb93aa53b97adf22035"}' ) TEST_REQ_TEXT = ( "

Test requirement 1 really l o n g text that is way too long to " @@ -165,7 +167,6 @@ def write(self, text: str): polarion_url=TEST_HOST, polarion_pat="PrivateAccessToken", polarion_delete_work_items=True, - capella_diagram_cache_folder_path=TEST_DIAGRAM_CACHE, capella_model=model, synchronize_config_io=MyIO(), ) @@ -179,9 +180,7 @@ def write(self, text: str): c2p_cli.config = mock.Mock(converter_config.ConverterConfig) mc = model_converter.ModelConverter( - model, - c2p_cli.capella_diagram_cache_folder_path, - c2p_cli.polarion_params.project_id, + model, c2p_cli.polarion_params.project_id ) mc.converter_session = { @@ -202,16 +201,22 @@ def test_create_diagrams(base_object: BaseObjectContainer): pw = base_object.pw new_work_items: dict[str, data_models.CapellaWorkItem] new_work_items = base_object.mc.generate_work_items( - pw.polarion_data_repo + pw.polarion_data_repo, generate_attachments=True ) assert len(new_work_items) == 1 work_item = new_work_items[TEST_DIAG_UUID] assert isinstance(work_item, data_models.CapellaWorkItem) description = work_item.description work_item.description = None + work_item.attachments = [] assert work_item == data_models.CapellaWorkItem(**TEST_SER_DIAGRAM) assert isinstance(description, str) - assert description.startswith(TEST_DIAG_DESCR) + assert description == TEST_DIAG_DESCR.format( + title="Diagram", + attachment_id="__C2P__diagram.svg", + width=750, + cls="diagram", + ) @staticmethod def test_create_diagrams_filters_non_diagram_elements( @@ -283,7 +288,6 @@ def write(self, text: str): polarion_url=TEST_HOST, polarion_pat="PrivateAccessToken", polarion_delete_work_items=True, - capella_diagram_cache_folder_path=pathlib.Path(""), capella_model=model, synchronize_config_io=MyIO(), ) @@ -303,9 +307,7 @@ def write(self, text: str): ) mc = model_converter.ModelConverter( - model, - c2p_cli.capella_diagram_cache_folder_path, - c2p_cli.polarion_params.project_id, + model, c2p_cli.polarion_params.project_id ) mc.converter_session = { @@ -495,9 +497,6 @@ def test_create_links_custom_exchanges_resolver( work_item_obj_2, ) - base_object.c2pcli.synchronize_config_roles = { - "SystemFunction": ["input_exchanges"] - } expected = polarion_api.WorkItemLink( "Obj-1", "Obj-2", @@ -687,6 +686,9 @@ def test_update_work_items( assert base_object.pw.client.create_work_item_links.call_count == 0 assert base_object.pw.client.update_work_item.call_count == 1 assert base_object.pw.client.get_work_item.call_count == 1 + assert ( + base_object.pw.client.get_all_work_item_attachments.call_count == 0 + ) work_item = base_object.pw.client.update_work_item.call_args[0][0] assert isinstance(work_item, data_models.CapellaWorkItem) assert work_item.id == "Obj-1" @@ -1029,7 +1031,6 @@ def test_diagram(model: capellambse.MelodyModel): diag = model.diagrams.by_uuid(TEST_DIAG_UUID) serializer = element_converter.CapellaWorkItemSerializer( - TEST_DIAGRAM_CACHE, model, polarion_repo.PolarionDataRepository(), { @@ -1037,29 +1038,37 @@ def test_diagram(model: capellambse.MelodyModel): "", DIAGRAM_CONFIG, diag ) }, + True, ) serialized_diagram = serializer.serialize(TEST_DIAG_UUID) - if serialized_diagram is not None: - serialized_diagram.description = None + + assert serialized_diagram is not None + + attachment = serialized_diagram.attachments[0] + attachment.content_bytes = None + + assert attachment == polarion_api.WorkItemAttachment( + "", "", "Diagram", None, "image/svg+xml", "__C2P__diagram.svg" + ) + + serialized_diagram.attachments = [] assert serialized_diagram == data_models.CapellaWorkItem( type="diagram", uuid_capella=TEST_DIAG_UUID, title="[CC] Capability", description_type="text/html", + description=TEST_DIAG_DESCR.format( + title="Diagram", + attachment_id="__C2P__diagram.svg", + width=750, + cls="diagram", + ), status="open", linked_work_items=[], ) - @staticmethod - def test__decode_diagram(): - diagram_path = TEST_DIAGRAM_CACHE / "_APMboAPhEeynfbzU12yy7w.svg" - - diagram = element_converter._decode_diagram(diagram_path) - - assert diagram.startswith("data:image/svg+xml;base64,") - @staticmethod @pytest.mark.parametrize( "layer,uuid,expected", @@ -1215,7 +1224,6 @@ def test_generic_work_item( assert type_config is not None serializer = element_converter.CapellaWorkItemSerializer( - pathlib.Path(""), model, polarion_repo.PolarionDataRepository( [ @@ -1231,6 +1239,7 @@ def test_generic_work_item( obj, ) }, + False, ) work_item = serializer.serialize(uuid) @@ -1247,7 +1256,6 @@ def test_add_context_diagram(self, model: capellambse.MelodyModel): "test", "add_context_diagram", [] ) serializer = element_converter.CapellaWorkItemSerializer( - pathlib.Path(""), model, polarion_repo.PolarionDataRepository(), { @@ -1257,6 +1265,7 @@ def test_add_context_diagram(self, model: capellambse.MelodyModel): model.by_uuid(uuid), ) }, + True, ) work_item = serializer.serialize(uuid) @@ -1265,7 +1274,24 @@ def test_add_context_diagram(self, model: capellambse.MelodyModel): assert "context_diagram" in work_item.additional_attributes assert str( work_item.additional_attributes["context_diagram"]["value"] - ).startswith(TEST_DIAG_DESCR) + ) == TEST_DIAG_DESCR.format( + title="Context Diagram", + attachment_id="__C2P__context_diagram.svg", + width=650, + cls="additional-attributes-diagram", + ) + + attachment = work_item.attachments[0] + attachment.content_bytes = None + + assert attachment == polarion_api.WorkItemAttachment( + "", + "", + "Context Diagram", + None, + "image/svg+xml", + "__C2P__context_diagram.svg", + ) def test_multiple_serializers(self, model: capellambse.MelodyModel): cap = model.by_uuid(TEST_OCAP_UUID) @@ -1275,7 +1301,6 @@ def test_multiple_serializers(self, model: capellambse.MelodyModel): [], ) serializer = element_converter.CapellaWorkItemSerializer( - pathlib.Path(""), model, polarion_repo.PolarionDataRepository(), { @@ -1283,6 +1308,7 @@ def test_multiple_serializers(self, model: capellambse.MelodyModel): "pa", type_config, cap ) }, + True, ) work_item = serializer.serialize(TEST_OCAP_UUID) @@ -1293,4 +1319,9 @@ def test_multiple_serializers(self, model: capellambse.MelodyModel): assert "context_diagram" in work_item.additional_attributes assert str( work_item.additional_attributes["context_diagram"]["value"] - ).startswith(TEST_DIAG_DESCR) + ) == TEST_DIAG_DESCR.format( + title="Context Diagram", + attachment_id="__C2P__context_diagram.svg", + width=650, + cls="additional-attributes-diagram", + ) diff --git a/tests/test_workitem_attachments.py b/tests/test_workitem_attachments.py new file mode 100644 index 00000000..ffc8a8cd --- /dev/null +++ b/tests/test_workitem_attachments.py @@ -0,0 +1,431 @@ +# Copyright DB InfraGO AG and contributors +# SPDX-License-Identifier: Apache-2.0 +import base64 +import hashlib +import json +from unittest import mock + +import cairosvg +import capellambse +import polarion_rest_api_client as polarion_api +import pytest + +from capella2polarion import data_models +from capella2polarion.connectors import polarion_repo, polarion_worker +from capella2polarion.converters import ( + converter_config, + data_session, + model_converter, +) + +from .conftest import TEST_DIAGRAM_CACHE +from .test_elements import TEST_DIAG_DESCR + +DIAGRAM_WI_CHECKSUM = ( + "122dc8471ac3135a1af1e3b44ac76a9acd888a9e4add162e7433a94aa498598d" +) + +TEST_DIAG_UUID = "_APMboAPhEeynfbzU12yy7w" +WORKITEM_ID = "TEST-ID" + +with open( + TEST_DIAGRAM_CACHE / "_APMboAPhEeynfbzU12yy7w.svg", "r", encoding="utf8" +) as f: + diagram_svg = f.read() + +wia_dict = { + "work_item_id": WORKITEM_ID, + "title": "Diagram", + "content_bytes": base64.b64encode(cairosvg.svg2png(diagram_svg)).decode( + "utf8" + ), + "mime_type": "image/png", + "file_name": "__C2P__diagram.png", +} + +DIAGRAM_PNG_CHECKSUM = hashlib.sha256( + json.dumps(wia_dict).encode("utf8") +).hexdigest() +DIAGRAM_CHECKSUM = json.dumps( + { + "__C2P__WORK_ITEM": DIAGRAM_WI_CHECKSUM, + "__C2P__diagram.png": DIAGRAM_PNG_CHECKSUM, + } +) + + +@pytest.fixture +def worker(monkeypatch: pytest.MonkeyPatch): + mock_api = mock.MagicMock(spec=polarion_api.OpenAPIPolarionProjectClient) + monkeypatch.setattr(polarion_api, "OpenAPIPolarionProjectClient", mock_api) + config = mock.Mock(converter_config.ConverterConfig) + return polarion_worker.CapellaPolarionWorker( + polarion_worker.PolarionWorkerParams( + "TEST", + "http://localhost", + "TESTPAT", + False, + ), + config, + ) + + +def set_attachment_ids(attachments: list[polarion_api.WorkItemAttachment]): + counter = 0 + attachments = sorted(attachments, key=lambda a: a.file_name) + for attachment in attachments: + attachment.id = f"{counter}-{attachment.file_name}" + counter += 1 + + +def test_diagram_no_attachments(model: capellambse.MelodyModel): + converter = model_converter.ModelConverter(model, "TEST") + converter.converter_session[TEST_DIAG_UUID] = data_session.ConverterData( + "", + converter_config.CapellaTypeConfig("diagram", "diagram", []), + model.diagrams.by_uuid(TEST_DIAG_UUID), + ) + converter.generate_work_items( + polarion_repo.PolarionDataRepository(), False, False + ) + work_item = converter.converter_session[TEST_DIAG_UUID].work_item + assert work_item is not None + assert work_item.attachments == [] + + +def test_diagram_has_attachments(model: capellambse.MelodyModel): + converter = model_converter.ModelConverter(model, "TEST") + converter.converter_session[TEST_DIAG_UUID] = data_session.ConverterData( + "", + converter_config.CapellaTypeConfig("diagram", "diagram", []), + model.diagrams.by_uuid(TEST_DIAG_UUID), + ) + converter.generate_work_items( + polarion_repo.PolarionDataRepository(), False, True + ) + + work_item = converter.converter_session[TEST_DIAG_UUID].work_item + assert work_item is not None + assert len(work_item.attachments) == 2 + + +def test_diagram_attachments_new( + model: capellambse.MelodyModel, + worker: polarion_worker.CapellaPolarionWorker, +): + converter = model_converter.ModelConverter(model, "TEST") + worker.polarion_data_repo = polarion_repo.PolarionDataRepository( + [data_models.CapellaWorkItem(WORKITEM_ID, uuid_capella=TEST_DIAG_UUID)] + ) + + worker.client.get_work_item.return_value = data_models.CapellaWorkItem( + WORKITEM_ID, uuid_capella=TEST_DIAG_UUID + ) + worker.client.create_work_item_attachments = mock.MagicMock() + worker.client.create_work_item_attachments.side_effect = set_attachment_ids + + converter.converter_session[TEST_DIAG_UUID] = data_session.ConverterData( + "", + converter_config.CapellaTypeConfig("diagram", "diagram", []), + model.diagrams.by_uuid(TEST_DIAG_UUID), + ) + + converter.generate_work_items(worker.polarion_data_repo, False, True) + + worker.patch_work_item(TEST_DIAG_UUID, converter.converter_session) + + assert worker.client.update_work_item.call_count == 1 + assert worker.client.create_work_item_attachments.call_count == 1 + assert worker.client.get_all_work_item_attachments.call_count == 0 + + created_attachments: list[ + polarion_api.WorkItemAttachment + ] = worker.client.create_work_item_attachments.call_args.args[0] + work_item: data_models.CapellaWorkItem = ( + worker.client.update_work_item.call_args.args[0] + ) + + assert len(created_attachments) == 2 + assert created_attachments[0].title == created_attachments[1].title + assert ( + created_attachments[0].file_name[:3] + == created_attachments[0].file_name[:3] + ) + + assert work_item.description == TEST_DIAG_DESCR.format( + title="Diagram", + attachment_id="1-__C2P__diagram.svg", + width=750, + cls="diagram", + ) + assert work_item.get_current_checksum() == DIAGRAM_CHECKSUM + + +def test_diagram_attachments_updated( + model: capellambse.MelodyModel, + worker: polarion_worker.CapellaPolarionWorker, +): + converter = model_converter.ModelConverter(model, "TEST") + worker.polarion_data_repo = polarion_repo.PolarionDataRepository( + [data_models.CapellaWorkItem(WORKITEM_ID, uuid_capella=TEST_DIAG_UUID)] + ) + existing_attachments = [ + polarion_api.WorkItemAttachment( + WORKITEM_ID, + "SVG-ATTACHMENT", + "test", + file_name="__C2P__diagram.svg", + ), + polarion_api.WorkItemAttachment( + WORKITEM_ID, + "PNG-ATTACHMENT", + "test", + file_name="__C2P__diagram.png", + ), + ] + + worker.client.get_work_item.return_value = data_models.CapellaWorkItem( + WORKITEM_ID, + uuid_capella=TEST_DIAG_UUID, + attachments=existing_attachments, + ) + + worker.client.get_all_work_item_attachments = mock.MagicMock() + worker.client.get_all_work_item_attachments.return_value = ( + existing_attachments + ) + + converter.converter_session[TEST_DIAG_UUID] = data_session.ConverterData( + "", + converter_config.CapellaTypeConfig("diagram", "diagram", []), + model.diagrams.by_uuid(TEST_DIAG_UUID), + ) + + converter.generate_work_items(worker.polarion_data_repo, False, True) + + worker.patch_work_item(TEST_DIAG_UUID, converter.converter_session) + + assert worker.client.update_work_item.call_count == 1 + assert worker.client.create_work_item_attachments.call_count == 0 + assert worker.client.update_work_item_attachment.call_count == 2 + assert worker.client.get_all_work_item_attachments.call_count == 1 + + work_item: data_models.CapellaWorkItem = ( + worker.client.update_work_item.call_args.args[0] + ) + + assert work_item.description == TEST_DIAG_DESCR.format( + title="Diagram", + attachment_id="SVG-ATTACHMENT", + width=750, + cls="diagram", + ) + + +def test_diagram_attachments_unchanged_work_item_changed( + model: capellambse.MelodyModel, + worker: polarion_worker.CapellaPolarionWorker, +): + converter = model_converter.ModelConverter(model, "TEST") + worker.polarion_data_repo = polarion_repo.PolarionDataRepository( + [ + data_models.CapellaWorkItem( + WORKITEM_ID, + uuid_capella=TEST_DIAG_UUID, + checksum=json.dumps( + { + "__C2P__WORK_ITEM": "123", + "__C2P__diagram.png": DIAGRAM_PNG_CHECKSUM, + } + ), + ) + ] + ) + worker.client.get_all_work_item_attachments = mock.MagicMock() + worker.client.get_all_work_item_attachments.return_value = [ + polarion_api.WorkItemAttachment( + WORKITEM_ID, + "SVG-ATTACHMENT", + "test", + file_name="__C2P__diagram.svg", + ), + polarion_api.WorkItemAttachment( + WORKITEM_ID, + "PNG-ATTACHMENT", + "test", + file_name="__C2P__diagram.png", + ), + ] + + converter.converter_session[TEST_DIAG_UUID] = data_session.ConverterData( + "", + converter_config.CapellaTypeConfig("diagram", "diagram", []), + model.diagrams.by_uuid(TEST_DIAG_UUID), + ) + + converter.generate_work_items(worker.polarion_data_repo, False, True) + + worker.patch_work_item(TEST_DIAG_UUID, converter.converter_session) + + assert worker.client.update_work_item.call_count == 1 + assert worker.client.create_work_item_attachments.call_count == 0 + assert worker.client.update_work_item_attachment.call_count == 0 + + work_item: data_models.CapellaWorkItem = ( + worker.client.update_work_item.call_args.args[0] + ) + + assert work_item.description == TEST_DIAG_DESCR.format( + title="Diagram", + attachment_id="SVG-ATTACHMENT", + width=750, + cls="diagram", + ) + + +def test_diagram_attachments_fully_unchanged( + model: capellambse.MelodyModel, + worker: polarion_worker.CapellaPolarionWorker, +): + converter = model_converter.ModelConverter(model, "TEST") + worker.polarion_data_repo = polarion_repo.PolarionDataRepository( + [ + data_models.CapellaWorkItem( + WORKITEM_ID, + uuid_capella=TEST_DIAG_UUID, + checksum=DIAGRAM_CHECKSUM, + ) + ] + ) + + converter.converter_session[TEST_DIAG_UUID] = data_session.ConverterData( + "", + converter_config.CapellaTypeConfig("diagram", "diagram", []), + model.diagrams.by_uuid(TEST_DIAG_UUID), + ) + + converter.generate_work_items(worker.polarion_data_repo, False, True) + + worker.patch_work_item(TEST_DIAG_UUID, converter.converter_session) + + assert worker.client.update_work_item.call_count == 0 + assert worker.client.create_work_item_attachments.call_count == 0 + assert worker.client.update_work_item_attachment.call_count == 0 + assert worker.client.get_all_work_item_attachments.call_count == 0 + + +def test_add_context_diagram( + model: capellambse.MelodyModel, + worker: polarion_worker.CapellaPolarionWorker, +): + uuid = "11906f7b-3ae9-4343-b998-95b170be2e2b" + converter = model_converter.ModelConverter(model, "TEST") + worker.polarion_data_repo = polarion_repo.PolarionDataRepository( + [data_models.CapellaWorkItem(WORKITEM_ID, uuid_capella=uuid)] + ) + + converter.converter_session[uuid] = data_session.ConverterData( + "", + converter_config.CapellaTypeConfig("test", "add_context_diagram", []), + model.by_uuid(uuid), + ) + + worker.client.create_work_item_attachments = mock.MagicMock() + worker.client.create_work_item_attachments.side_effect = set_attachment_ids + + converter.generate_work_items(worker.polarion_data_repo, False, True) + + worker.patch_work_item(uuid, converter.converter_session) + + assert worker.client.update_work_item.call_count == 1 + assert worker.client.create_work_item_attachments.call_count == 1 + + created_attachments: list[ + polarion_api.WorkItemAttachment + ] = worker.client.create_work_item_attachments.call_args.args[0] + work_item: data_models.CapellaWorkItem = ( + worker.client.update_work_item.call_args.args[0] + ) + + assert len(created_attachments) == 2 + assert created_attachments[0].title == created_attachments[1].title + assert ( + created_attachments[0].file_name[:3] + == created_attachments[0].file_name[:3] + ) + + assert str( + work_item.additional_attributes["context_diagram"]["value"] + ) == TEST_DIAG_DESCR.format( + title="Context Diagram", + attachment_id="1-__C2P__context_diagram.svg", + width=650, + cls="additional-attributes-diagram", + ) + + +def test_diagram_delete_attachments( + model: capellambse.MelodyModel, + worker: polarion_worker.CapellaPolarionWorker, +): + converter = model_converter.ModelConverter(model, "TEST") + worker.polarion_data_repo = polarion_repo.PolarionDataRepository( + [ + data_models.CapellaWorkItem( + WORKITEM_ID, + uuid_capella=TEST_DIAG_UUID, + checksum=json.dumps( + { + "__C2P__WORK_ITEM": DIAGRAM_WI_CHECKSUM, + "__C2P__diagram.png": DIAGRAM_PNG_CHECKSUM, + "delete_me.png": "123", + } + ), + ) + ] + ) + worker.client.get_all_work_item_attachments = mock.MagicMock() + worker.client.get_all_work_item_attachments.return_value = [ + polarion_api.WorkItemAttachment( + WORKITEM_ID, + "SVG-ATTACHMENT", + "test", + file_name="__C2P__diagram.svg", + ), + polarion_api.WorkItemAttachment( + WORKITEM_ID, + "PNG-ATTACHMENT", + "test", + file_name="__C2P__diagram.png", + ), + polarion_api.WorkItemAttachment( + WORKITEM_ID, "SVG-DELETE", "test", file_name="delete_me.svg" + ), + polarion_api.WorkItemAttachment( + WORKITEM_ID, "PNG-DELETE", "test", file_name="delete_me.png" + ), + ] + + converter.converter_session[TEST_DIAG_UUID] = data_session.ConverterData( + "", + converter_config.CapellaTypeConfig("diagram", "diagram", []), + model.diagrams.by_uuid(TEST_DIAG_UUID), + ) + + converter.generate_work_items(worker.polarion_data_repo, False, True) + + worker.patch_work_item(TEST_DIAG_UUID, converter.converter_session) + + assert worker.client.update_work_item.call_count == 1 + assert worker.client.create_work_item_attachments.call_count == 0 + assert worker.client.update_work_item_attachment.call_count == 0 + assert worker.client.delete_work_item_attachment.call_count == 2 + + work_item: data_models.CapellaWorkItem = ( + worker.client.update_work_item.call_args.args[0] + ) + + assert work_item.description is None + assert work_item.additional_attributes == {} + assert work_item.title is None + assert work_item.get_current_checksum() == DIAGRAM_CHECKSUM