Skip to content

Commit

Permalink
feat: Decrease amount of requests
Browse files Browse the repository at this point in the history
Merge pull request #12 from DSD-DBS/decrease-requests
  • Loading branch information
ewuerger authored Oct 25, 2023
2 parents fe22b4c + 35a9311 commit 01c25cc
Show file tree
Hide file tree
Showing 14 changed files with 693 additions and 467 deletions.
141 changes: 71 additions & 70 deletions capella2polarion/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,45 +83,68 @@ def _get_roles_from_config(ctx: dict[str, t.Any]) -> dict[str, list[str]]:


def _sanitize_config(
config: dict[str, list[str | dict[str, t.Any]]], special: dict[str, t.Any]
config: dict[str, list[str | dict[str, t.Any]]],
special: list[str | dict[str, t.Any]],
) -> dict[str, t.Any]:
special_config: dict[str, t.Any] = {}
for typ in special:
if isinstance(typ, str):
special_config[typ] = None
else:
special_config.update(typ)

lookup: dict[str, dict[str, list[str]]] = {}
for layer, xtypes in config.items():
for xt in xtypes:
if isinstance(xt, str):
item: dict[str, list[str]] = {xt: []}
else:
item = xt

lookup.setdefault(layer, {}).update(item)

new_config: dict[str, t.Any] = {}
for layer, xtypes in config.items():
new_entries: list[str | dict[str, t.Any]] = []
for xtype in xtypes:
if isinstance(xtype, dict):
for sub_key, sub_value in xtype.items():
new_value = (
special.get("*", [])
+ special.get(sub_key, [])
special_config.get("*", [])
+ special_config.get(sub_key, [])
+ sub_value
)
new_entries.append({sub_key: new_value})
else:
if new_value := special.get("*", []) + special.get(xtype, []):
star = special_config.get("*", [])
special_xtype = special_config.get(xtype, [])
if new_value := star + special_xtype:
new_entries.append({xtype: new_value})
else:
new_entries.append(xtype)

wildcard_values = special_config.get("*", [])
for key, value in special_config.items():
if key == "*":
continue

if isinstance(value, list):
new_value = (
lookup.get(layer, {}).get(key, [])
+ wildcard_values
+ value
)
new_entries.append({key: new_value})
elif value is None and key not in [
entry if isinstance(entry, str) else list(entry.keys())[0]
for entry in new_entries
]:
new_entries.append({key: wildcard_values})
new_config[layer] = new_entries

return new_config


def get_polarion_wi_map(
ctx: dict[str, t.Any], type_: str = ""
) -> dict[str, t.Any]:
"""Return a map from Capella UUIDs to Polarion work items."""
types_ = map(elements.helpers.resolve_element_type, ctx.get("TYPES", []))
work_item_types = [type_] if type_ else list(types_)
_type = " ".join(work_item_types)
work_items = ctx["API"].get_all_work_items(
f"type:({_type})", {"workitems": "id,uuid_capella,status"}
)
return {
wi.uuid_capella: wi for wi in work_items if wi.id and wi.uuid_capella
}


@click.group()
@click.option("--debug/--no-debug", is_flag=True, default=False)
@click.option("--project-id", required=True, type=str)
Expand All @@ -144,12 +167,14 @@ def cli(
polarion_api_endpoint=f"{ctx.obj['POLARION_HOST']}/rest/v1",
polarion_access_token=os.environ["POLARION_PAT"],
custom_work_item=serialize.CapellaWorkItem,
add_work_item_checksum=True,
)
if not ctx.obj["API"].project_exists():
sys.exit(1)


@cli.command()
@click.argument("model", type=cli_helpers.ModelCLI())
@click.argument(
"diagram_cache",
type=click.Path(
Expand All @@ -160,10 +185,16 @@ def cli(
path_type=pathlib.Path,
),
)
@click.argument("config_file", type=click.File(mode="r", encoding="utf8"))
@click.pass_context
def diagrams(ctx: click.core.Context, diagram_cache: pathlib.Path) -> None:
"""Synchronise diagrams."""
logger.debug(
def model_elements(
ctx: click.core.Context,
model: capellambse.MelodyModel,
diagram_cache: pathlib.Path,
config_file: t.TextIO,
) -> None:
"""Synchronise model elements."""
logger.info(
"Synchronising diagrams from diagram cache at '%s' "
"to Polarion project with id %r...",
diagram_cache,
Expand All @@ -176,29 +207,12 @@ def diagrams(ctx: click.core.Context, diagram_cache: pathlib.Path) -> None:

ctx.obj["DIAGRAM_CACHE"] = diagram_cache
ctx.obj["DIAGRAM_IDX"] = json.loads(idx_file.read_text(encoding="utf8"))
ctx.obj["CAPELLA_UUIDS"] = [
d["uuid"] for d in ctx.obj["DIAGRAM_IDX"] if d["success"]
]
ctx.obj["POLARION_WI_MAP"] = get_polarion_wi_map(ctx.obj, "diagram")
ctx.obj["POLARION_ID_MAP"] = {
uuid: wi.id for uuid, wi in ctx.obj["POLARION_WI_MAP"].items()
}

elements.delete_work_items(ctx.obj)
elements.diagram.update_diagrams(ctx.obj)
elements.diagram.create_diagrams(ctx.obj)


@cli.command()
@click.argument("model", type=cli_helpers.ModelCLI())
@click.argument("config_file", type=click.File(mode="r", encoding="utf8"))
@click.pass_context
def model_elements(
ctx: click.core.Context,
model: capellambse.MelodyModel,
config_file: t.TextIO,
) -> None:
"""Synchronise model elements."""
logger.info(
"Synchronising model elements (%r) to Polarion project with id %r...",
str(elements.ELEMENTS_IDX_PATH),
ctx.obj["PROJECT_ID"],
)
ctx.obj["MODEL"] = model
ctx.obj["CONFIG"] = yaml.safe_load(config_file)
ctx.obj["ROLES"] = _get_roles_from_config(ctx.obj)
Expand All @@ -208,39 +222,26 @@ def model_elements(
) = elements.get_elements_and_type_map(ctx.obj)
ctx.obj["CAPELLA_UUIDS"] = set(ctx.obj["POLARION_TYPE_MAP"])
ctx.obj["TYPES"] = elements.get_types(ctx.obj)
ctx.obj["POLARION_WI_MAP"] = get_polarion_wi_map(ctx.obj)
ctx.obj["POLARION_WI_MAP"] = elements.get_polarion_wi_map(ctx.obj)
ctx.obj["POLARION_ID_MAP"] = {
uuid: wi.id for uuid, wi in ctx.obj["POLARION_WI_MAP"].items()
}
duuids = {
diag["uuid"] for diag in ctx.obj["DIAGRAM_IDX"] if diag["success"]
}
ctx.obj["ELEMENTS"]["Diagram"] = [
diag for diag in ctx.obj["ELEMENTS"]["Diagram"] if diag.uuid in duuids
]

elements.delete_work_items(ctx.obj)
elements.element.update_work_items(ctx.obj)
elements.element.create_work_items(ctx.obj)
elements.delete_work_items(ctx.obj)
elements.post_work_items(ctx.obj)

ctx.obj["POLARION_WI_MAP"] = get_polarion_wi_map(ctx.obj)
ctx.obj["POLARION_ID_MAP"] = {
uuid: wi.id for uuid, wi in ctx.obj["POLARION_WI_MAP"].items()
}
elements.element.update_links(ctx.obj)

diagram_work = get_polarion_wi_map(ctx.obj, "diagram")
ctx.obj["POLARION_ID_MAP"] |= {
uuid: wi.id for uuid, wi in diagram_work.items()
}
_diagrams = [
diagram
for diagram in model.diagrams
if diagram.uuid in ctx.obj["POLARION_ID_MAP"]
]
ctx.obj["ROLES"]["Diagram"] = ["diagram_elements"]
elements.element.update_links(ctx.obj, _diagrams)
# Create missing links b/c of unresolved references
elements.element.create_work_items(ctx.obj)
elements.patch_work_items(ctx.obj)

elements_index_file = elements.make_model_elements_index(ctx.obj)
logger.debug(
"Synchronising model objects (%r) to Polarion project with id %r...",
str(elements_index_file),
ctx.obj["PROJECT_ID"],
)
elements.make_model_elements_index(ctx.obj)


if __name__ == "__main__":
Expand Down
114 changes: 109 additions & 5 deletions capella2polarion/elements/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
"STATUS_DELETE",
]

import functools
import logging
import pathlib
import typing as t
Expand All @@ -20,6 +21,7 @@
import polarion_rest_api_client as polarion_api
import yaml
from capellambse.model import common
from capellambse.model import diagram as diag

logger = logging.getLogger(__name__)

Expand All @@ -34,7 +36,7 @@
"PhysicalComponentNode": "PhysicalComponent",
"PhysicalComponentBehavior": "PhysicalComponent",
}
POL2CAPELLA_TYPES = (
POL2CAPELLA_TYPES: dict[str, str] = (
{
"OperationalEntity": "Entity",
"OperationalInteraction": "FunctionalExchange",
Expand All @@ -45,6 +47,21 @@
)


def get_polarion_wi_map(
ctx: dict[str, t.Any], type_: str = ""
) -> dict[str, t.Any]:
"""Return a map from Capella UUIDs to Polarion work items."""
types_ = map(helpers.resolve_element_type, ctx.get("TYPES", []))
work_item_types = [type_] if type_ else list(types_)
_type = " ".join(work_item_types)
work_items = ctx["API"].get_all_work_items(
f"type:({_type})", {"workitems": "id,uuid_capella,checksum,status"}
)
return {
wi.uuid_capella: wi for wi in work_items if wi.id and wi.uuid_capella
}


def delete_work_items(ctx: dict[str, t.Any]) -> None:
"""Delete work items in a Polarion project.
Expand All @@ -58,7 +75,7 @@ def delete_work_items(ctx: dict[str, t.Any]) -> None:
"""

def serialize_for_delete(uuid: str) -> str:
logger.debug(
logger.info(
"Delete work item %r...",
workitem_id := ctx["POLARION_ID_MAP"][uuid],
)
Expand All @@ -78,6 +95,78 @@ def serialize_for_delete(uuid: str) -> str:
logger.error("Deleting work items failed. %s", error.args[0])


def post_work_items(ctx: dict[str, t.Any]) -> None:
"""Post work items in a Polarion project.
Parameters
----------
ctx
The context for the workitem operation to be processed.
"""
work_items: list[serialize.CapellaWorkItem] = []
for work_item in ctx["WORK_ITEMS"].values():
if work_item.uuid_capella in ctx["POLARION_ID_MAP"]:
continue

assert work_item is not None
work_items.append(work_item)
logger.info("Create work item for %r...", work_item.title)
if work_items:
try:
ctx["API"].create_work_items(work_items)
workitems = {wi.uuid_capella: wi for wi in work_items if wi.id}
ctx["POLARION_WI_MAP"].update(workitems)
ctx["POLARION_ID_MAP"] = {
uuid: wi.id for uuid, wi in ctx["POLARION_WI_MAP"].items()
}
except polarion_api.PolarionApiException as error:
logger.error("Creating work items failed. %s", error.args[0])


def patch_work_items(ctx: dict[str, t.Any]) -> None:
"""Update work items in a Polarion project.
Parameters
----------
ctx
The context for the workitem operation to be processed.
"""

def add_content(
obj: common.GenericElement | diag.Diagram,
ctx: dict[str, t.Any],
**kwargs,
) -> serialize.CapellaWorkItem:
work_item = ctx["WORK_ITEMS"][obj.uuid]
for key, value in kwargs.items():
if getattr(work_item, key, None) is None:
continue

setattr(work_item, key, value)
return work_item

ctx["POLARION_ID_MAP"] = uuids = {
uuid: wi.id
for uuid, wi in ctx["POLARION_WI_MAP"].items()
if wi.status == "open" and wi.uuid_capella and wi.id
}
for uuid in uuids:
elements = ctx["MODEL"]
if uuid.startswith("_"):
elements = ctx["MODEL"].diagrams
obj = elements.by_uuid(uuid)

links = element.create_links(obj, ctx)

api_helper.patch_work_item(
ctx,
obj,
functools.partial(add_content, linked_work_items=links),
obj._short_repr_(),
"element",
)


def get_types(ctx: dict[str, t.Any]) -> set[str]:
"""Return a set of Polarion types from the current context."""
xtypes = set[str]()
Expand All @@ -100,13 +189,24 @@ def get_elements_and_type_map(
if isinstance(typ, dict):
typ = list(typ.keys())[0]

if typ == "Diagram":
continue

xtype = convert_type.get(typ, typ)
objects = ctx["MODEL"].search(xtype, below=below)
elements.setdefault(typ, []).extend(objects)
for obj in objects:
type_map[obj.uuid] = typ

_fix_components(elements, type_map)
diagrams_from_cache = {
d["uuid"] for d in ctx["DIAGRAM_IDX"] if d["success"]
}
elements["Diagram"] = [
d for d in ctx["MODEL"].diagrams if d.uuid in diagrams_from_cache
]
for obj in elements["Diagram"]:
type_map[obj.uuid] = "Diagram"
return elements, type_map


Expand Down Expand Up @@ -150,7 +250,7 @@ def _fix_components(
elements["PhysicalComponent"] = components


def make_model_elements_index(ctx: dict[str, t.Any]) -> pathlib.Path:
def make_model_elements_index(ctx: dict[str, t.Any]) -> None:
"""Create an elements index file for all migrated elements."""
elements: list[dict[str, t.Any]] = []
for obj in chain.from_iterable(ctx["ELEMENTS"].values()):
Expand All @@ -176,7 +276,11 @@ def make_model_elements_index(ctx: dict[str, t.Any]) -> pathlib.Path:
elements.append(element_)

ELEMENTS_IDX_PATH.write_text(yaml.dump(elements), encoding="utf8")
return ELEMENTS_IDX_PATH


from . import diagram, element, helpers, serialize
from . import ( # pylint: disable=cyclic-import
api_helper,
element,
helpers,
serialize,
)
Loading

0 comments on commit 01c25cc

Please sign in to comment.