diff --git a/docs/custom_diagram.md b/docs/custom_diagram.md
new file mode 100644
index 0000000..98d1c1c
--- /dev/null
+++ b/docs/custom_diagram.md
@@ -0,0 +1,47 @@
+
+
+# Custom Diagram
+
+`Custom diagram`s let you create custom diagrams based on the data in the model. You define the data collection using an iterable, and `Custom diagram` takes care of the rest.
+
+You can access `.custom_diagram` on any supported model element.
+
+??? example "Custom Diagram of `PP 1 `"
+
+ ``` py
+ import capellambse
+
+ def _collector(
+ target: m.ModelElement,
+ ) -> cabc.Iterator[m.ModelElement]:
+ visited = set()
+ def collector(
+ target: m.ModelElement,
+ ) -> cabc.Iterator[m.ModelElement]:
+ if target.uuid in visited:
+ return
+ visited.add(target.uuid)
+ for link in target.links:
+ yield link
+ yield from collector(link.source)
+ yield from collector(link.target)
+ yield from collector(target)
+
+ model = capellambse.MelodyModel("tests/data/ContextDiagram.aird")
+ obj = model.by_uuid("c403d4f4-9633-42a2-a5d6-9e1df2655146")
+ diag = obj.context_diagram
+ diag.render("svgdiagram", collect=_collector(obj)).save(pretty=True)
+ ```
+
+
+## Check out the code
+
+To understand the collection have a look into the
+[`custom`][capellambse_context_diagrams.collectors.custom]
+module.
diff --git a/docs/gen_images.py b/docs/gen_images.py
index f681c15..f446135 100644
--- a/docs/gen_images.py
+++ b/docs/gen_images.py
@@ -29,6 +29,7 @@
"Physical Node": "fdb34c92-7c49-491d-bf11-dd139930786e",
"Physical Behavior": "313f48f4-fb7e-47a8-b28a-76440932fcb9",
"Maintain": "ee745644-07d7-40b9-ad7a-910dc8cbb805",
+ "Physical Port": "c403d4f4-9633-42a2-a5d6-9e1df2655146",
}
interface_context_diagram_uuids: dict[str, str] = {
"Left to right": "3ef23099-ce9a-4f7d-812f-935f47e7938d",
diff --git a/docs/index.md b/docs/index.md
index ec46073..7387b84 100644
--- a/docs/index.md
+++ b/docs/index.md
@@ -192,6 +192,20 @@ Available via `.context_diagram` on a [`ModelObject`][capellambse.model.ModelObj
Context of Maintain Switch Firmware [PDFB]
+- ??? example "[`pa.PhysicalPort`][capellambse.metamodel.cs.PhysicalPort] (PAB)"
+
+ ``` py
+ import capellambse
+
+ model = capellambse.MelodyModel("tests/data/ContextDiagram.aird")
+ diag = model.by_uuid("c403d4f4-9633-42a2-a5d6-9e1df2655146").context_diagram
+ diag.render("svgdiagram").save(pretty=True)
+ ```
+
+
#### Hierarchy in diagrams
Hierarchical diagrams are diagrams where boxes have child boxes and edges
diff --git a/mkdocs.yml b/mkdocs.yml
index bba8e99..cb4d50f 100644
--- a/mkdocs.yml
+++ b/mkdocs.yml
@@ -99,6 +99,8 @@ nav:
- Overview: data_flow_view.md
- Cable Tree View:
- Overview: cable_tree.md
+ - Custom Diagram:
+ - Overview: custom_diagram.md
- Extras:
- Filters: extras/filters.md
- Styling: extras/styling.md
diff --git a/src/capellambse_context_diagrams/__init__.py b/src/capellambse_context_diagrams/__init__.py
index df27619..06b1f78 100644
--- a/src/capellambse_context_diagrams/__init__.py
+++ b/src/capellambse_context_diagrams/__init__.py
@@ -63,10 +63,12 @@ def init() -> None:
"""Initialize the extension."""
register_classes()
register_interface_context()
+ register_physical_port_context()
register_tree_view()
register_realization_view()
register_data_flow_view()
register_cable_tree_view()
+ register_custom_diagram()
# register_functional_context() XXX: Future
@@ -251,6 +253,15 @@ def register_functional_context() -> None:
)
+def register_physical_port_context() -> None:
+ """Add the `context_diagram` attribute to `PhysicalPort`s."""
+ m.set_accessor(
+ cs.PhysicalPort,
+ ATTR_NAME,
+ context.PhysicalPortContextAccessor(DiagramType.PAB.value, {}),
+ )
+
+
def register_tree_view() -> None:
"""Add the ``tree_view`` attribute to ``Class``es."""
m.set_accessor(
@@ -317,3 +328,31 @@ def register_cable_tree_view() -> None:
{},
),
)
+
+
+def register_custom_diagram() -> None:
+ """Add the `custom_diagram` attribute to `ModelObject`s."""
+ supported_classes: list[tuple[type[m.ModelElement], DiagramType]] = [
+ (oa.Entity, DiagramType.OAB),
+ (oa.OperationalActivity, DiagramType.OAB),
+ (oa.OperationalCapability, DiagramType.OCB),
+ (oa.CommunicationMean, DiagramType.OAB),
+ (sa.Mission, DiagramType.MCB),
+ (sa.Capability, DiagramType.MCB),
+ (sa.SystemComponent, DiagramType.SAB),
+ (sa.SystemFunction, DiagramType.SAB),
+ (la.LogicalComponent, DiagramType.LAB),
+ (la.LogicalFunction, DiagramType.LAB),
+ (pa.PhysicalComponent, DiagramType.PAB),
+ (pa.PhysicalFunction, DiagramType.PAB),
+ (cs.PhysicalLink, DiagramType.PAB),
+ (cs.PhysicalPort, DiagramType.PAB),
+ (fa.ComponentExchange, DiagramType.SAB),
+ (information.Class, DiagramType.CDB),
+ ]
+ for class_, dgcls in supported_classes:
+ m.set_accessor(
+ class_,
+ "custom_diagram",
+ context.CustomContextAccessor(dgcls.value, {}),
+ )
diff --git a/src/capellambse_context_diagrams/collectors/custom.py b/src/capellambse_context_diagrams/collectors/custom.py
new file mode 100644
index 0000000..e17c3a5
--- /dev/null
+++ b/src/capellambse_context_diagrams/collectors/custom.py
@@ -0,0 +1,292 @@
+# SPDX-FileCopyrightText: 2022 Copyright DB InfraGO AG and the capellambse-context-diagrams contributors
+# SPDX-License-Identifier: Apache-2.0
+
+"""Collector for the CustomDiagram."""
+
+from __future__ import annotations
+
+import copy
+import typing as t
+
+import capellambse.model as m
+
+from .. import _elkjs, context
+from . import generic, makers
+
+
+def _is_edge(obj: m.ModelElement) -> bool:
+ return hasattr(obj, "source") and hasattr(obj, "target")
+
+
+def _is_port(obj: m.ModelElement) -> bool:
+ return obj.xtype.endswith("Port")
+
+
+class CustomCollector:
+ """Collect the context for a custom diagram."""
+
+ def __init__(
+ self,
+ diagram: context.ContextDiagram,
+ params: dict[str, t.Any],
+ ) -> None:
+ self.diagram = diagram
+ self.target: m.ModelElement = self.diagram.target
+
+ self.boxable_target: m.ModelElement
+ if _is_port(self.target):
+ self.boxable_target = self.target.owner
+ elif _is_edge(self.target):
+ self.boxable_target = self.target.source.owner
+ else:
+ self.boxable_target = self.target
+
+ self.data = makers.make_diagram(diagram)
+ self.params = params
+ self.collection = self.diagram._collect
+ self.boxes: dict[str, _elkjs.ELKInputChild] = {}
+ self.edges: dict[str, _elkjs.ELKInputEdge] = {}
+ self.ports: dict[str, _elkjs.ELKInputPort] = {}
+ self.boxes_to_delete: set[str] = set()
+
+ if self.diagram._display_parent_relation:
+ self.edge_owners: dict[str, str] = {}
+ self.diagram_target_owners = list(
+ generic.get_all_owners(self.boxable_target)
+ )
+ self.common_owners: set[str] = set()
+
+ if self.diagram._unify_edge_direction != "NONE":
+ self.directions: dict[str, bool] = {}
+ self.min_heights: dict[str, dict[str, float]] = {}
+
+ def __call__(self) -> _elkjs.ELKInputData:
+ if _is_port(self.target):
+ self._make_port_and_owner(self.target)
+ else:
+ self._make_target(self.target)
+
+ if target_edge := self.edges.get(self.target.uuid):
+ target_edge.layoutOptions = copy.deepcopy(
+ _elkjs.EDGE_STRAIGHTENING_LAYOUT_OPTIONS
+ )
+
+ if self.diagram._unify_edge_direction == "UNIFORM":
+ self.directions[self.boxable_target.uuid] = False
+
+ for elem in self.collection:
+ self._make_target(elem)
+
+ if self.diagram._display_parent_relation:
+ current = self.boxable_target
+ while (
+ current
+ and self.common_owners
+ and getattr(current, "owner", None) is not None
+ and not isinstance(current.owner, generic.PackageTypes)
+ ):
+ self.common_owners.discard(current.uuid)
+ current = generic.make_owner_box(
+ current, self._make_box, self.boxes, self.boxes_to_delete
+ )
+ self.common_owners.discard(current.uuid)
+ for edge_uuid, box_uuid in self.edge_owners.items():
+ if box := self.boxes.get(box_uuid):
+ box.edges.append(self.edges.pop(edge_uuid))
+
+ self._fix_box_heights()
+ for uuid in self.boxes_to_delete:
+ del self.boxes[uuid]
+ return self._get_data()
+
+ def _get_data(self) -> t.Any:
+ self.data.children = list(self.boxes.values())
+ self.data.edges = list(self.edges.values())
+ return self.data
+
+ def _fix_box_heights(self) -> None:
+ if self.diagram._unify_edge_direction != "NONE":
+ for uuid, min_heights in self.min_heights.items():
+ box = self.boxes[uuid]
+ box.height = max(box.height, sum(min_heights.values()))
+ else:
+ for uuid, min_heights in self.min_heights.items():
+ box = self.boxes[uuid]
+ box.height = max([box.height, *min_heights.values()])
+
+ def _make_target(
+ self, obj: m.ModelElement
+ ) -> _elkjs.ELKInputChild | _elkjs.ELKInputEdge | None:
+ if _is_edge(obj):
+ return self._make_edge_and_ports(obj)
+ return self._make_box(obj, slim_width=self.diagram._slim_center_box)
+
+ def _make_box(
+ self,
+ obj: m.ModelElement,
+ **kwargs: t.Any,
+ ) -> _elkjs.ELKInputChild:
+ if box := self.boxes.get(obj.uuid):
+ return box
+ box = makers.make_box(
+ obj,
+ no_symbol=self.diagram._display_symbols_as_boxes,
+ **kwargs,
+ )
+ self.boxes[obj.uuid] = box
+ if self.diagram._display_unused_ports:
+ for attr in generic.DIAGRAM_TYPE_TO_CONNECTOR_NAMES[
+ self.diagram.type
+ ]:
+ for port in getattr(obj, attr, []):
+ self._make_port_and_owner(port)
+ if self.diagram._display_parent_relation:
+ self.common_owners.add(
+ generic.make_owner_boxes(
+ obj,
+ self.diagram_target_owners,
+ self._make_box,
+ self.boxes,
+ self.boxes_to_delete,
+ )
+ )
+ return box
+
+ def _make_edge_and_ports(
+ self,
+ edge_obj: m.ModelElement,
+ ) -> _elkjs.ELKInputEdge | None:
+ if self.edges.get(edge_obj.uuid):
+ return None
+ src_obj = edge_obj.source
+ tgt_obj = edge_obj.target
+ src_owner = src_obj.owner
+ tgt_owner = tgt_obj.owner
+ src_owners = list(generic.get_all_owners(src_obj))
+ tgt_owners = list(generic.get_all_owners(tgt_obj))
+ if self.diagram._hide_direct_children and (
+ self.boxable_target.uuid in src_owners
+ or self.boxable_target.uuid in tgt_owners
+ ):
+ return None
+ if self.diagram._display_parent_relation:
+ common_owner = None
+ for owner in src_owners:
+ if owner in tgt_owners:
+ common_owner = owner
+ break
+ if common_owner:
+ self.edge_owners[edge_obj.uuid] = common_owner
+
+ if self._need_switch(
+ src_owners, tgt_owners, src_owner.uuid, tgt_owner.uuid
+ ):
+ src_obj, tgt_obj = tgt_obj, src_obj
+ src_owner, tgt_owner = tgt_owner, src_owner
+
+ if not self.ports.get(src_obj.uuid):
+ port = self._make_port_and_owner(src_obj)
+ self._update_min_heights(src_owner.uuid, "right", port)
+ if not self.ports.get(tgt_obj.uuid):
+ port = self._make_port_and_owner(tgt_obj)
+ self._update_min_heights(tgt_owner.uuid, "left", port)
+
+ edge = _elkjs.ELKInputEdge(
+ id=edge_obj.uuid,
+ sources=[src_obj.uuid],
+ targets=[tgt_obj.uuid],
+ labels=makers.make_label(
+ edge_obj.name,
+ ),
+ )
+ self.edges[edge_obj.uuid] = edge
+ return edge
+
+ def _update_min_heights(
+ self, owner_uuid: str, side: str, port: _elkjs.ELKInputPort
+ ) -> None:
+ self.min_heights.setdefault(owner_uuid, {"left": 0.0, "right": 0.0})[
+ side
+ ] += makers.PORT_SIZE + max(
+ 2 * makers.PORT_PADDING,
+ sum(label.height for label in port.labels),
+ )
+
+ def _need_switch(
+ self,
+ src_owners: list[str],
+ tgt_owners: list[str],
+ src_uuid: str,
+ tgt_uuid: str,
+ ) -> bool:
+ if self.diagram._unify_edge_direction == "SMART":
+ if src_uuid != self.boxable_target.uuid:
+ src_uncommon = [
+ owner for owner in src_owners if owner not in tgt_owners
+ ][-1]
+ src_dir = self.directions.setdefault(src_uncommon, False)
+ else:
+ src_dir = None
+ if tgt_uuid != self.boxable_target.uuid:
+ tgt_uncommon = [
+ owner for owner in tgt_owners if owner not in src_owners
+ ][-1]
+ tgt_dir = self.directions.setdefault(tgt_uncommon, True)
+ else:
+ tgt_dir = None
+ if (src_dir is True) or (tgt_dir is False):
+ return True
+ elif self.diagram._unify_edge_direction == "UNIFORM":
+ src_dir = self.directions.get(src_uuid)
+ tgt_dir = self.directions.get(tgt_uuid)
+ if (src_dir is None) and (tgt_dir is None):
+ self.directions[src_uuid] = False
+ self.directions[tgt_uuid] = True
+ elif src_dir is None:
+ self.directions[src_uuid] = not tgt_dir
+ elif tgt_dir is None:
+ self.directions[tgt_uuid] = not src_dir
+ if self.directions[src_uuid]:
+ return True
+ elif self.diagram._unify_edge_direction == "TREE":
+ src_dir = self.directions.get(src_uuid)
+ tgt_dir = self.directions.get(tgt_uuid)
+ if (src_dir is None) and (tgt_dir is None):
+ self.directions[src_uuid] = True
+ self.directions[tgt_uuid] = True
+ elif src_dir is None:
+ self.directions[src_uuid] = True
+ return True
+ elif tgt_dir is None:
+ self.directions[tgt_uuid] = True
+ return False
+
+ def _make_port_and_owner(
+ self, port_obj: m.ModelElement
+ ) -> _elkjs.ELKInputPort:
+ owner_obj = port_obj.owner
+ box = self._make_box(
+ owner_obj,
+ layout_options=makers.DEFAULT_LABEL_LAYOUT_OPTIONS,
+ )
+ if port := self.ports.get(port_obj.uuid):
+ return port
+ port = makers.make_port(port_obj.uuid)
+ if self.diagram._display_port_labels:
+ text = port_obj.name or "UNKNOWN"
+ port.labels = makers.make_label(text)
+ _plp = self.diagram._port_label_position
+ if not (plp := getattr(_elkjs.PORT_LABEL_POSITION, _plp, None)):
+ raise ValueError(f"Invalid port label position '{_plp}'.")
+ assert isinstance(plp, _elkjs.PORT_LABEL_POSITION)
+ box.layoutOptions["portLabels.placement"] = plp.name
+ box.ports.append(port)
+ self.ports[port_obj.uuid] = port
+ return port
+
+
+def collector(
+ diagram: context.ContextDiagram, params: dict[str, t.Any]
+) -> _elkjs.ELKInputData:
+ """Collect data for rendering a custom diagram."""
+ return CustomCollector(diagram, params)()
diff --git a/src/capellambse_context_diagrams/collectors/default.py b/src/capellambse_context_diagrams/collectors/default.py
index 5e2babe..d54acd0 100644
--- a/src/capellambse_context_diagrams/collectors/default.py
+++ b/src/capellambse_context_diagrams/collectors/default.py
@@ -69,7 +69,6 @@ def process_context(self):
):
box = self._make_box(
self.diagram.target.owner,
- no_symbol=self.diagram._display_symbols_as_boxes,
layout_options=makers.DEFAULT_LABEL_LAYOUT_OPTIONS,
)
box.children = [self.centerbox]
@@ -85,9 +84,11 @@ def process_context(self):
and hasattr(current, "owner")
and not isinstance(current.owner, generic.PackageTypes)
):
- current = self._make_owner_box(
- self.diagram,
+ current = generic.make_owner_box(
current,
+ self._make_box,
+ self.global_boxes,
+ self.boxes_to_delete,
)
self.common_owners.discard(current.uuid)
@@ -153,8 +154,8 @@ def _process_exchanges(
list[generic.ExchangeData],
]:
inc, out = port_collector(self.diagram.target, self.diagram.type)
- inc_c = port_exchange_collector(inc)
- out_c = port_exchange_collector(out)
+ inc_c = port_exchange_collector(inc.values())
+ out_c = port_exchange_collector(out.values())
inc_exchanges = list(chain.from_iterable(inc_c.values()))
out_exchanges = list(chain.from_iterable(out_c.values()))
port_spread: dict[str, int] = {}
@@ -198,7 +199,7 @@ def _process_exchanges(
is_inc = tgt.parent == self.diagram.target
is_out = src.parent == self.diagram.target
if is_inc and is_out:
- pass
+ pass # Support cycles
elif (is_out and (port_spread.get(tgt_owner, 0) > 0)) or (
is_inc and (port_spread.get(src_owner, 0) <= 0)
):
@@ -208,7 +209,7 @@ def _process_exchanges(
except AttributeError:
continue
- ports = inc + out
+ ports = list((inc | out).values())
if not self.diagram._display_unused_ports:
ports = [
p for p in ports if (inc_c.get(p.uuid) or out_c.get(p.uuid))
@@ -251,22 +252,21 @@ def _process_ports(self) -> None:
box = self._make_box(
owner,
height=height,
- no_symbol=self.diagram._display_symbols_as_boxes,
)
box.ports = local_port_objs
box.layoutOptions["portLabels.placement"] = "OUTSIDE"
if self.diagram._display_parent_relation:
- current = owner
- while (
- current
- and current.uuid not in self.diagram_target_owners
- and getattr(current, "owner", None) is not None
- and not isinstance(current.owner, generic.PackageTypes)
- ):
- current = self._make_owner_box(self.diagram, current)
- self.common_owners.add(current.uuid)
+ self.common_owners.add(
+ generic.make_owner_boxes(
+ owner,
+ self.diagram_target_owners,
+ self._make_box,
+ self.global_boxes,
+ self.boxes_to_delete,
+ )
+ )
def _make_port(
self, port_obj: t.Any
@@ -287,38 +287,17 @@ def _make_box(
obj: t.Any,
**kwargs: t.Any,
) -> _elkjs.ELKInputChild:
+ if box := self.global_boxes.get(obj.uuid):
+ return box
box = makers.make_box(
obj,
+ no_symbol=self.diagram._display_symbols_as_boxes,
**kwargs,
)
self.global_boxes[obj.uuid] = box
self.made_boxes[obj.uuid] = box
return box
- def _make_owner_box(
- self,
- diagram: context.ContextDiagram,
- obj: t.Any,
- ) -> t.Any:
- if not (parent_box := self.global_boxes.get(obj.owner.uuid)):
- parent_box = self._make_box(
- obj.owner,
- no_symbol=diagram._display_symbols_as_boxes,
- layout_options=makers.DEFAULT_LABEL_LAYOUT_OPTIONS,
- )
- assert (obj_box := self.global_boxes.get(obj.uuid))
- for box in (children := parent_box.children):
- if box.id == obj.uuid:
- box = obj_box
- break
- else:
- children.append(obj_box)
- for label in parent_box.labels:
- label.layoutOptions = makers.DEFAULT_LABEL_LAYOUT_OPTIONS
-
- self.boxes_to_delete.add(obj.uuid)
- return obj.owner
-
def collector(
diagram: context.ContextDiagram, params: dict[str, t.Any] | None = None
@@ -339,13 +318,13 @@ def collector(
def port_collector(
target: m.ModelElement | m.ElementList, diagram_type: DT
-) -> tuple[list[m.ModelElement], list[m.ModelElement]]:
+) -> tuple[dict[str, m.ModelElement], dict[str, m.ModelElement]]:
"""Collect ports from `target` savely."""
def __collect(target):
port_types = fa.FunctionPort | fa.ComponentPort | cs.PhysicalPort
- incoming_ports: list[m.ModelElement] = []
- outgoing_ports: list[m.ModelElement] = []
+ incoming_ports: dict[str, m.ModelElement] = {}
+ outgoing_ports: dict[str, m.ModelElement] = {}
for attr in generic.DIAGRAM_TYPE_TO_CONNECTOR_NAMES[diagram_type]:
try:
ports = getattr(target, attr)
@@ -353,27 +332,27 @@ def __collect(target):
continue
if attr == "inputs":
- incoming_ports.extend(ports)
+ incoming_ports.update({p.uuid: p for p in ports})
elif attr == "ports":
for port in ports:
if port.direction == "IN":
- incoming_ports.append(port)
+ incoming_ports[port.uuid] = port
else:
- outgoing_ports.append(port)
+ outgoing_ports[port.uuid] = port
else:
- outgoing_ports.extend(ports)
+ outgoing_ports.update({p.uuid: p for p in ports})
except AttributeError:
pass
return incoming_ports, outgoing_ports
if isinstance(target, cabc.Iterable):
assert not isinstance(target, m.ModelElement)
- incoming_ports: list[m.ModelElement] = []
- outgoing_ports: list[m.ModelElement] = []
+ incoming_ports: dict[str, m.ModelElement] = {}
+ outgoing_ports: dict[str, m.ModelElement] = {}
for obj in target:
inc, out = __collect(obj)
- incoming_ports.extend(inc)
- outgoing_ports.extend(out)
+ incoming_ports.update(inc)
+ outgoing_ports.update(out)
else:
incoming_ports, outgoing_ports = __collect(target)
return incoming_ports, outgoing_ports
@@ -479,10 +458,10 @@ def derive_from_functions(
receive special styling in the serialization step.
"""
assert isinstance(diagram.target, cs.Component)
- ports = []
+ ports: list[m.ModelElement] = []
for fnc in diagram.target.allocated_functions:
inc, out = port_collector(fnc, diagram.type)
- ports.extend(inc + out)
+ ports.extend((inc | out).values())
derived_components: dict[str, cs.Component] = {}
for port in ports:
diff --git a/src/capellambse_context_diagrams/collectors/generic.py b/src/capellambse_context_diagrams/collectors/generic.py
index fe09d6f..04225b0 100644
--- a/src/capellambse_context_diagrams/collectors/generic.py
+++ b/src/capellambse_context_diagrams/collectors/generic.py
@@ -267,3 +267,50 @@ def get_all_owners(obj: m.ModelElement) -> cabc.Iterator[str]:
while current is not None:
yield current.uuid
current = getattr(current, "owner", None)
+
+
+def make_owner_box(
+ obj: t.Any,
+ make_box_func: t.Callable,
+ boxes: dict[str, _elkjs.ELKInputChild],
+ boxes_to_delete: set[str],
+) -> t.Any:
+ parent_box = make_box_func(
+ obj.owner,
+ layout_options=makers.DEFAULT_LABEL_LAYOUT_OPTIONS,
+ )
+ assert (obj_box := boxes.get(obj.uuid))
+ for box in (children := parent_box.children):
+ if box.id == obj.uuid:
+ break
+ else:
+ children.append(obj_box)
+ obj_box.width = max(
+ obj_box.width,
+ parent_box.width,
+ )
+ for label in parent_box.labels:
+ label.layoutOptions = makers.DEFAULT_LABEL_LAYOUT_OPTIONS
+ boxes_to_delete.add(obj.uuid)
+ return obj.owner
+
+
+def make_owner_boxes(
+ obj: m.ModelElement,
+ excluded: list[str],
+ make_box_func: t.Callable,
+ boxes: dict[str, _elkjs.ELKInputChild],
+ boxes_to_delete: set[str],
+) -> str:
+ """Create owner boxes for all owners of ``obj``."""
+ current = obj
+ while (
+ current
+ and current.uuid not in excluded
+ and getattr(current, "owner", None) is not None
+ and not isinstance(current.owner, PackageTypes)
+ ):
+ current = make_owner_box(
+ current, make_box_func, boxes, boxes_to_delete
+ )
+ return current.uuid
diff --git a/src/capellambse_context_diagrams/context.py b/src/capellambse_context_diagrams/context.py
index 7bf459b..d2e31d6 100644
--- a/src/capellambse_context_diagrams/context.py
+++ b/src/capellambse_context_diagrams/context.py
@@ -22,6 +22,7 @@
from . import _elkjs, filters, serializers, styling
from .collectors import (
cable_tree,
+ custom,
dataflow_view,
exchanges,
get_elkdata,
@@ -125,6 +126,20 @@ def __get__( # type: ignore
return self._get(obj, FunctionalContextDiagram)
+class PhysicalPortContextAccessor(ContextAccessor):
+ def __get__( # type: ignore
+ self,
+ obj: m.T | None,
+ objtype: type | None = None,
+ ) -> m.Accessor | ContextDiagram:
+ """Make a ContextDiagram for the given model object."""
+ del objtype
+ if obj is None: # pragma: no cover
+ return self
+ assert isinstance(obj, m.ModelElement)
+ return self._get(obj, PhysicalPortContextDiagram)
+
+
class ClassTreeAccessor(ContextAccessor):
"""Provides access to the tree view diagrams."""
@@ -208,6 +223,22 @@ def __get__( # type: ignore
return self._get(obj, CableTreeViewDiagram)
+class CustomContextAccessor(ContextAccessor):
+ """Provides access to the custom context diagrams."""
+
+ def __get__( # type: ignore
+ self,
+ obj: m.T | None,
+ objtype: type | None = None,
+ ) -> m.Accessor | ContextDiagram:
+ """Make a CustomDiagram for the given model object."""
+ del objtype
+ if obj is None: # pragma: no cover
+ return self
+ assert isinstance(obj, m.ModelElement)
+ return self._get(obj, CustomDiagram)
+
+
class ContextDiagram(m.AbstractDiagram):
"""An automatically generated context diagram.
@@ -854,6 +885,82 @@ def name(self) -> str:
return f"Cable Tree View of {self.target.name}"
+class CustomDiagram(ContextDiagram):
+ """An automatically generated CustomDiagram Diagram."""
+
+ _collect: cabc.Iterator[m.ModelElement]
+ _unify_edge_direction: str
+
+ def __init__(
+ self,
+ class_: str,
+ obj: m.ModelElement,
+ *,
+ render_styles: dict[str, styling.Styler] | None = None,
+ default_render_parameters: dict[str, t.Any],
+ ) -> None:
+ default_render_parameters = {
+ "collect": [],
+ "slim_center_box": False,
+ "unify_edge_direction": str,
+ } | default_render_parameters
+ super().__init__(
+ class_,
+ obj,
+ render_styles=render_styles,
+ default_render_parameters=default_render_parameters,
+ )
+ self.collector = custom.collector
+
+ @property
+ def name(self) -> str:
+ return f"Custom Context of {self.target.name.replace('/', '- or -')}"
+
+
+class PhysicalPortContextDiagram(CustomDiagram):
+ """A custom Context Diagram exclusively for PhysicalPorts."""
+
+ def __init__(
+ self,
+ class_: str,
+ obj: m.ModelElement,
+ *,
+ render_styles: dict[str, styling.Styler] | None = None,
+ default_render_parameters: dict[str, t.Any],
+ ) -> None:
+ visited = set()
+
+ def _collector(
+ target: m.ModelElement,
+ ) -> cabc.Iterator[m.ModelElement]:
+ if target.uuid in visited:
+ return
+ visited.add(target.uuid)
+ for link in target.links:
+ yield link
+ yield from _collector(link.source)
+ yield from _collector(link.target)
+
+ default_render_parameters = {
+ "collect": _collector(obj),
+ "display_parent_relation": True,
+ "unify_edge_direction": "UNIFORM",
+ "display_port_labels": True,
+ "port_label_position": _elkjs.PORT_LABEL_POSITION.OUTSIDE.name,
+ } | default_render_parameters
+
+ super().__init__(
+ class_,
+ obj,
+ render_styles=render_styles,
+ default_render_parameters=default_render_parameters,
+ )
+
+ @property
+ def name(self) -> str:
+ return f"Context of {self.target.name.replace('/', '- or -')}"
+
+
def try_to_layout(data: _elkjs.ELKInputData) -> _elkjs.ELKOutputData:
"""Try calling elkjs, raise a JSONDecodeError if it fails."""
try:
diff --git a/tests/test_context_diagrams.py b/tests/test_context_diagrams.py
index 32a2c31..9a978ac 100644
--- a/tests/test_context_diagrams.py
+++ b/tests/test_context_diagrams.py
@@ -26,6 +26,7 @@
TEST_ENTITY_UUID = "e37510b9-3166-4f80-a919-dfaac9b696c7"
TEST_SYS_FNC_UUID = "a5642060-c9cc-4d49-af09-defaa3024bae"
TEST_DERIVATION_UUID = "4ec45aec-0d6a-411a-80ee-ebd3c1a53d2c"
+TEST_PHYSICAL_PORT_UUID = "c403d4f4-9633-42a2-a5d6-9e1df2655146"
@pytest.mark.parametrize(
@@ -53,13 +54,13 @@
"c78b5d7c-be0c-4ed4-9d12-d447cb39304e",
id="PhysicalBehaviorComponent",
),
+ pytest.param(TEST_PHYSICAL_PORT_UUID, id="PhysicalPort"),
],
)
def test_context_diagrams(model: capellambse.MelodyModel, uuid: str) -> None:
obj = model.by_uuid(uuid)
diag = obj.context_diagram
- diag.render(None, display_parent_relation=True)
diag.render(None, display_parent_relation=False)
assert diag.nodes