diff --git a/capella2polarion/__main__.py b/capella2polarion/__main__.py index 995424d..64d1ba7 100644 --- a/capella2polarion/__main__.py +++ b/capella2polarion/__main__.py @@ -109,6 +109,12 @@ def print_cli_state(capella2polarion_cli: Capella2PolarionCli) -> None: envvar="CAPELLA2POLARION_ROLE_PREFIX", default="", ) +@click.option( + "--grouped-links-custom-fields / --no-grouped-links-custom-fields", + envvar="CAPELLA2POLARION_GROUPED_LINKS_CUSTOM_FIELDS", + is_flag=True, + default=True, +) @click.pass_context def synchronize( ctx: click.core.Context, @@ -116,6 +122,7 @@ def synchronize( force_update: bool, type_prefix: str, role_prefix: str, + grouped_links_custom_fields: bool, ) -> None: """Synchronise model elements.""" capella_to_polarion_cli: Capella2PolarionCli = ctx.obj @@ -153,6 +160,7 @@ def synchronize( polarion_worker.polarion_data_repo, generate_links=True, generate_attachments=True, + generate_grouped_links_custom_fields=grouped_links_custom_fields, ) polarion_worker.compare_and_update_work_items(converter.converter_session) diff --git a/capella2polarion/converters/link_converter.py b/capella2polarion/converters/link_converter.py index ac2405b..522632b 100644 --- a/capella2polarion/converters/link_converter.py +++ b/capella2polarion/converters/link_converter.py @@ -49,6 +49,10 @@ def __init__( converter_config.DIAGRAM_ELEMENTS_SERIALIZER: self._handle_diagram_reference_links, # pylint: disable=line-too-long } + self._link_field_groups: dict[str, list[polarion_api.WorkItemLink]] = ( + defaultdict(list) + ) + def create_links_for_work_item( self, uuid: str ) -> list[polarion_api.WorkItemLink]: @@ -58,6 +62,7 @@ def create_links_for_work_item( work_item = converter_data.work_item assert work_item is not None assert work_item.id is not None + self._link_field_groups.clear() new_links: list[polarion_api.WorkItemLink] = [] link_errors: list[str] = [] for link_config in converter_data.type_config.links: @@ -66,9 +71,7 @@ def create_links_for_work_item( try: assert work_item.id is not None if serializer: - new_links.extend( - serializer(obj, work_item.id, role_id, {}) - ) + links = serializer(obj, work_item.id, role_id, {}) else: refs = _resolve_attribute(obj, link_config.capella_attr) new: cabc.Iterable[str] @@ -88,9 +91,10 @@ def create_links_for_work_item( new = set( self._get_work_item_ids(work_item.id, new, role_id) ) - new_links.extend( - self._create(work_item.id, role_id, new, {}) - ) + links = self._create(work_item.id, role_id, new, {}) + + new_links.extend(links) + self._link_field_groups[link_config.link_field].extend(links) except Exception as error: error_message = make_link_logging_message( f"{type(error).__name__} {str(error)}", @@ -223,23 +227,22 @@ def create_grouped_link_fields( assert work_item is not None wi = f"[{work_item.id}]({work_item.type} {work_item.title})" logger.debug("Building grouped links for work item %r...", wi) - for role, grouped_links in _group_by( - "role", work_item.linked_work_items - ).items(): - if (config := find_link_config(data, role)) is not None: - if back_links is not None and config.reverse_field: - for link in grouped_links: - back_links.setdefault( - link.secondary_work_item_id, {} - ).setdefault(config.reverse_field, []).append(link) - - if config.link_field: - self._create_link_fields( - work_item, - config.link_field, - grouped_links, - config=config, - ) + for link_config in data.type_config.links: + grouped_links = self._link_field_groups[link_config.link_field] + + if back_links is not None and link_config.reverse_field: + for link in grouped_links: + back_links.setdefault( + link.secondary_work_item_id, {} + ).setdefault(link_config.reverse_field, []).append(link) + + if grouped_links: + self._create_link_fields( + work_item, + link_config.link_field, + grouped_links, + config=link_config, + ) def _create_link_fields( self, diff --git a/capella2polarion/converters/model_converter.py b/capella2polarion/converters/model_converter.py index 64c9867..29ce3f1 100644 --- a/capella2polarion/converters/model_converter.py +++ b/capella2polarion/converters/model_converter.py @@ -83,6 +83,7 @@ def generate_work_items( polarion_data_repo: polarion_repo.PolarionDataRepository, generate_links: bool = False, generate_attachments: bool = False, + generate_grouped_links_custom_fields: bool = False, ) -> dict[str, data_models.CapellaWorkItem]: """Return a work items mapping from model elements for Polarion. @@ -100,6 +101,9 @@ def generate_work_items( generate_attachments A boolean flag to control attachments generation. For SVG attachments, PNGs are generated and attached automatically. + generate_grouped_links_custom_fields + A boolean flag to control grouped links custom fields + generation. """ serializer = element_converter.CapellaWorkItemSerializer( self.model, @@ -113,13 +117,16 @@ def generate_work_items( assert work_item.type is not None if generate_links: - self.generate_work_item_links(polarion_data_repo) + self.generate_work_item_links( + polarion_data_repo, generate_grouped_links_custom_fields + ) return {wi.uuid_capella: wi for wi in work_items} def generate_work_item_links( self, polarion_data_repo: polarion_repo.PolarionDataRepository, + generate_grouped_links_custom_fields: bool, ): """Generate links for all work items and add custom fields for them.""" back_links: dict[str, dict[str, list[polarion_api.WorkItemLink]]] = {} @@ -140,9 +147,10 @@ def generate_work_item_links( links = link_serializer.create_links_for_work_item(uuid) converter_data.work_item.linked_work_items = links - link_serializer.create_grouped_link_fields( - converter_data, back_links - ) + if generate_grouped_links_custom_fields: + link_serializer.create_grouped_link_fields( + converter_data, back_links + ) for uuid, converter_data in self.converter_session.items(): if converter_data.work_item is None: diff --git a/tests/conftest.py b/tests/conftest.py index 6aa2f42..c6b296c 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -37,6 +37,12 @@ DOCUMENT_TEMPLATES = TEST_DOCUMENT_ROOT / "templates" DOCUMENT_TEXT_WORK_ITEMS = "document_work_items.html.j2" DOCUMENT_WORK_ITEMS_CROSS_PROJECT = "work_items_cross_project.html.j2" +LINK_CONFIG = converter_config.LinkConfig( + capella_attr="attribute", + polarion_role="attribute", + link_field="attribute", + reverse_field="attribute_reverse", +) @pytest.fixture @@ -103,16 +109,10 @@ class UnsupportedFakeModelObject(FakeModelObject): """A ``FakeModelObject`` which shouldn't be migrated.""" -class BaseObjectContainer: - def __init__( - self, - c2p_cli: cli.Capella2PolarionCli, - pw: polarion_worker.CapellaPolarionWorker, - mc: model_converter.ModelConverter, - ) -> None: - self.c2pcli: cli.Capella2PolarionCli = c2p_cli - self.pw: polarion_worker.CapellaPolarionWorker = pw - self.mc = mc +class BaseObjectContainer(t.NamedTuple): + c2pcli: cli.Capella2PolarionCli + pw: polarion_worker.CapellaPolarionWorker + mc: model_converter.ModelConverter # pylint: disable=redefined-outer-name @@ -141,12 +141,7 @@ def base_object( fake = FakeModelObject("uuid1", name="Fake 1") fake_model_type_config = converter_config.CapellaTypeConfig( - "fakeModelObject", - links=[ - converter_config.LinkConfig( - capella_attr="attribute", polarion_role="attribute" - ) - ], + "fakeModelObject", links=[LINK_CONFIG] ) mc = model_converter.ModelConverter( diff --git a/tests/test_cli.py b/tests/test_cli.py index 46e04af..49cecd5 100644 --- a/tests/test_cli.py +++ b/tests/test_cli.py @@ -130,6 +130,7 @@ def test_migrate_model_elements(cli_mocks: CLIMocks): "synchronize", "--synchronize-config", str(TEST_MODEL_ELEMENTS_CONFIG), + "--grouped-links-custom-fields", ] result = testing.CliRunner().invoke(main.cli, command, terminal_width=60) @@ -140,6 +141,7 @@ def test_migrate_model_elements(cli_mocks: CLIMocks): assert cli_mocks.generate_work_items.call_args_list[1][1] == { "generate_links": True, "generate_attachments": True, + "generate_grouped_links_custom_fields": True, } assert cli_mocks.delete_work_items.call_count == 1 assert cli_mocks.patch_work_items.call_count == 1 diff --git a/tests/test_elements.py b/tests/test_elements.py index 2abbe1b..0757634 100644 --- a/tests/test_elements.py +++ b/tests/test_elements.py @@ -5,6 +5,7 @@ import logging import typing as t +from collections import defaultdict from unittest import mock import capellambse @@ -26,6 +27,7 @@ # pylint: disable-next=relative-beyond-top-level, useless-suppression from .conftest import ( # type: ignore[import] + LINK_CONFIG, TEST_MODEL_ELEMENTS_CONFIG, BaseObjectContainer, FakeModelObject, @@ -175,15 +177,7 @@ def grouped_links_base_object( dummy_work_items: dict[str, data_models.CapellaWorkItem], ) -> GroupedLinksBaseObject: config = converter_config.CapellaTypeConfig( - "fakeModelObject", - links=[ - converter_config.LinkConfig( - capella_attr="attribute", - polarion_role="attribute", - link_field="attribute", - reverse_field="attribute_reverse", - ) - ], + "fakeModelObject", links=[LINK_CONFIG] ) mock_model = mock.MagicMock() fake_2 = FakeModelObject("uuid2", "Fake 2") @@ -992,6 +986,8 @@ def test_update_links(base_object: BaseObjectContainer): type="fakeModelObject", ) ) + for data in base_object.mc.converter_session.values(): + data.type_config.links[0].polarion_role = "attribute" base_object.pw.project_client.work_items.links.get_all.side_effect = ( [link], @@ -1001,7 +997,8 @@ def test_update_links(base_object: BaseObjectContainer): "Obj-2", "Obj-1", "attribute", None, "project_id" ) base_object.mc.generate_work_item_links( - base_object.pw.polarion_data_repo + base_object.pw.polarion_data_repo, + generate_grouped_links_custom_fields=True, ) work_item_1 = ( @@ -1118,7 +1115,8 @@ def mock_back_link(converter_data, back_links): ] base_object.mc.model = mock_model base_object.mc.generate_work_item_links( - base_object.pw.polarion_data_repo + base_object.pw.polarion_data_repo, + generate_grouped_links_custom_fields=True, ) base_object.pw.compare_and_update_work_items( base_object.mc.converter_session @@ -1185,10 +1183,14 @@ def test_maintain_grouped_links_attributes( converter_data = data_session.ConverterData( "test", config, [], work_item ) + link_serializer._link_field_groups["attribute"] = ( + work_item.linked_work_items + ) link_serializer.create_grouped_link_fields(converter_data) del dummy_work_items["uuid0"].additional_attributes["uuid_capella"] del dummy_work_items["uuid1"].additional_attributes["uuid_capella"] del dummy_work_items["uuid2"].additional_attributes["uuid_capella"] + assert ( dummy_work_items["uuid0"].additional_attributes.pop("attribute")[ "value" @@ -1244,13 +1246,63 @@ def test_maintain_grouped_links_attributes_with_role_prefix( converter_data = data_session.ConverterData( "test", config, [], work_item ) + if work_item.uuid_capella == "uuid0": + link_serializer._link_field_groups["attribute"] = ( + work_item.linked_work_items + ) + link_serializer.create_grouped_link_fields(converter_data) + link_serializer._link_field_groups = defaultdict(list) assert "attribute" in dummy_work_items["uuid0"].additional_attributes assert ( # Link Role on links were not prefixed "attribute" not in dummy_work_items["uuid1"].additional_attributes ) + @staticmethod + def test_grouped_links_attributes_different_link_field_in_config( + base_object: BaseObjectContainer, + ): + converter_data_1 = base_object.mc.converter_session["uuid1"] + converter_data_2 = base_object.mc.converter_session["uuid2"] + converter_data_2.work_item = data_models.CapellaWorkItem( + id="Obj-2", uuid_capella="uuid2", status="open" + ) + base_object.pw.polarion_data_repo.update_work_items( + [converter_data_2.work_item] + ) + converter_data_1.type_config.links.append( + converter_config.LinkConfig( + capella_attr="attribute1", + polarion_role="attribute", + link_field="attribute1", + reverse_field="attribute1_reverse", + ) + ) + converter_data_1.capella_element.attribute = ( + converter_data_2.capella_element + ) + converter_data_1.capella_element.attribute1 = ( + converter_data_2.capella_element + ) + expected_html = ( + "" + ) + + base_object.mc.generate_work_item_links( + base_object.pw.polarion_data_repo, + generate_grouped_links_custom_fields=True, + ) + + assert (link_group := getattr(converter_data_1.work_item, "attribute")) + assert ( + link_group1 := getattr(converter_data_1.work_item, "attribute1") + ) + assert link_group["value"] == link_group1["value"] == expected_html + @staticmethod def test_grouped_links_attributes_with_includes( base_object: BaseObjectContainer, model: capellambse.MelodyModel @@ -1352,6 +1404,9 @@ def test_maintain_reverse_grouped_links_attributes( data[work_item.id] = converter_data = data_session.ConverterData( "test", config, [], work_item ) + link_serializer._link_field_groups["attribute"] = ( + work_item.linked_work_items + ) link_serializer.create_grouped_link_fields( converter_data, back_links ) @@ -1395,6 +1450,9 @@ def test_maintain_reverse_grouped_links_unidirectional_config( FakeModelObject(work_item.uuid_capella), work_item, ) + link_serializer._link_field_groups["attribute"] = ( + work_item.linked_work_items + ) link_serializer.create_grouped_link_fields( converter_data, back_links ) @@ -1427,19 +1485,18 @@ def test_maintain_reverse_grouped_links_attributes_with_role_prefix( config.links[0].polarion_role = f"_C2P_{config.links[0].polarion_role}" back_links: dict[str, dict[str, list[polarion_api.WorkItemLink]]] = {} data = {} - for link in ( - dummy_work_items["uuid0"].linked_work_items - + dummy_work_items["uuid1"].linked_work_items - ): - link.role = f"_C2P_{link.role}" - for work_item in dummy_work_items.values(): + for link in work_item.linked_work_items: + link_serializer._link_field_groups[link.role].append(link) + link.role = f"_C2P_{link.role}" + data[work_item.id] = converter_data = data_session.ConverterData( "test", config, [], work_item ) link_serializer.create_grouped_link_fields( converter_data, back_links ) + for work_item_id, links in back_links.items(): assert (wi := data[work_item_id].work_item) is not None link_serializer.create_grouped_back_link_fields(wi, links)