diff --git a/cimgen/cimgen.py b/cimgen/cimgen.py
index dfea746..3f8cd79 100644
--- a/cimgen/cimgen.py
+++ b/cimgen/cimgen.py
@@ -20,6 +20,8 @@ def asJson(self):
jsonObject = {}
if self.about() is not None:
jsonObject["about"] = self.about()
+ if self.namespace() is not None:
+ jsonObject["namespace"] = self.namespace()
if self.comment() is not None:
jsonObject["comment"] = self.comment()
if self.dataType() is not None:
@@ -51,6 +53,12 @@ def about(self):
else:
return None
+ def namespace(self):
+ if "$rdf:about" in self.jsonDefinition:
+ return self.jsonDefinition["$rdf:about"][: -len(self.about())]
+ else:
+ return None
+
# Capitalized True/False is valid in python but not in json.
# Do not use this function in combination with json.load()
def is_used(self) -> bool:
@@ -211,6 +219,7 @@ def __init__(self, rdfsEntry):
self.super = rdfsEntry.subClassOf()
self.subclasses = []
self.stereotype = rdfsEntry.stereotype()
+ self.namespace = rdfsEntry.namespace()
def attributes(self):
return self.attribute_list
@@ -405,6 +414,13 @@ def _parse_rdf(input_dic, version): # NOSONAR
for instance in enum_instances:
clarse = _get_rid_of_hash(instance["type"])
if clarse and clarse in classes_map:
+ if instance.get("comment"):
+ instance["wrapped_comment"] = wrap_and_clean(
+ instance["comment"],
+ width=100,
+ initial_indent="# ",
+ subsequent_indent=(" # "),
+ )
classes_map[clarse].add_enum_instance(instance)
else:
logger.info("Class {} for enum instance {} not found.".format(clarse, instance))
@@ -429,6 +445,7 @@ def _write_python_files(elem_dict, lang_pack, output_path, version):
"class_location": lang_pack.get_class_location(class_name, elem_dict, version),
"class_name": class_name,
"class_origin": elem_dict[class_name].origins(),
+ "class_namespace": _get_namespace(elem_dict[class_name].namespace),
"enum_instances": elem_dict[class_name].enum_instances(),
"is_an_enum_class": elem_dict[class_name].is_an_enum_class(),
"is_a_primitive_class": elem_dict[class_name].is_a_primitive_class(),
@@ -467,6 +484,7 @@ def _write_python_files(elem_dict, lang_pack, output_path, version):
attribute["is_primitive_attribute"] = _get_bool_string(attribute_type == "primitive")
attribute["is_datatype_attribute"] = _get_bool_string(attribute_type == "datatype")
attribute["attribute_class"] = attribute_class
+ attribute["attribute_namespace"] = _get_namespace(attribute["namespace"])
class_details["attributes"].sort(key=lambda d: d["label"])
_write_files(class_details, output_path, version)
@@ -749,6 +767,14 @@ def _get_attribute_type(attribute: dict, class_infos: CIMComponentDefinition) ->
return attribute_type
+def _get_namespace(parsed_namespace: str) -> str:
+ if parsed_namespace == "#":
+ namespace = cim_namespace
+ else:
+ namespace = parsed_namespace
+ return namespace
+
+
def _get_bool_string(bool_value: bool) -> str:
"""Convert boolean value into a string which is usable in both Python and Json.
diff --git a/cimgen/languages/modernpython/lang_pack.py b/cimgen/languages/modernpython/lang_pack.py
index 64cb410..58c5c1f 100644
--- a/cimgen/languages/modernpython/lang_pack.py
+++ b/cimgen/languages/modernpython/lang_pack.py
@@ -98,14 +98,6 @@ def _get_python_type(datatype):
return "float"
-def _set_imports(attributes):
- import_set = set()
- for attribute in attributes:
- if attribute["is_datatype_attribute"] or attribute["is_primitive_attribute"]:
- import_set.add(attribute["attribute_class"])
- return sorted(import_set)
-
-
def _set_datatype_attributes(attributes) -> dict:
datatype_attributes = {}
datatype_attributes["python_type"] = "None"
@@ -136,13 +128,20 @@ def run_template(output_path, class_details):
template = class_template_file
class_details["setDefault"] = _set_default
class_details["setType"] = _set_type
- class_details["imports"] = _set_imports(class_details["attributes"])
resource_file = _create_file(output_path, class_details, template)
_write_templated_file(resource_file, class_details, template["filename"])
def _create_file(output_path, class_details, template) -> str:
- resource_file = Path(output_path) / "resources" / (class_details["class_name"] + template["ext"])
+ if (
+ class_details["is_a_primitive_class"]
+ or class_details["is_a_datatype_class"]
+ or class_details["is_an_enum_class"]
+ ):
+ class_category = "types"
+ else:
+ class_category = ""
+ resource_file = Path(output_path) / "resources" / class_category / (class_details["class_name"] + template["ext"])
resource_file.parent.mkdir(exist_ok=True)
return str(resource_file)
diff --git a/cimgen/languages/modernpython/templates/class_template.mustache b/cimgen/languages/modernpython/templates/class_template.mustache
index 5956170..da6ee72 100644
--- a/cimgen/languages/modernpython/templates/class_template.mustache
+++ b/cimgen/languages/modernpython/templates/class_template.mustache
@@ -10,9 +10,6 @@ from pydantic.dataclasses import dataclass
from ..utils.profile import BaseProfile, Profile
from {{class_location}} import {{sub_class_of}}
-{{#imports}}
-from .{{.}} import {{.}}
-{{/imports}}
@dataclass
@@ -35,16 +32,17 @@ class {{class_name}}({{sub_class_of}}):
{{/attr_origin}}
],
"is_used": {{#is_used}}True{{/is_used}}{{^is_used}}False{{/is_used}},
+ "namespace": "{{attribute_namespace}}", # NOSONAR
"is_class_attribute": {{#is_class_attribute}}True{{/is_class_attribute}}{{^is_class_attribute}}False{{/is_class_attribute}},
"is_datatype_attribute": {{#is_datatype_attribute}}True{{/is_datatype_attribute}}{{^is_datatype_attribute}}False{{/is_datatype_attribute}},
"is_enum_attribute": {{#is_enum_attribute}}True{{/is_enum_attribute}}{{^is_enum_attribute}}False{{/is_enum_attribute}},
"is_list_attribute": {{#is_list_attribute}}True{{/is_list_attribute}}{{^is_list_attribute}}False{{/is_list_attribute}},
"is_primitive_attribute": {{#is_primitive_attribute}}True{{/is_primitive_attribute}}{{^is_primitive_attribute}}False{{/is_primitive_attribute}},
{{#is_datatype_attribute}}
- "attribute_class": {{attribute_class}},
+ "attribute_class": "{{attribute_class}}",
{{/is_datatype_attribute}}
{{#is_primitive_attribute}}
- "attribute_class": {{attribute_class}},
+ "attribute_class": "{{attribute_class}}",
{{/is_primitive_attribute}}
},
)
diff --git a/cimgen/languages/modernpython/templates/datatype_template.mustache b/cimgen/languages/modernpython/templates/datatype_template.mustache
index 6d69021..a6d4d6a 100644
--- a/cimgen/languages/modernpython/templates/datatype_template.mustache
+++ b/cimgen/languages/modernpython/templates/datatype_template.mustache
@@ -2,8 +2,8 @@
Generated from the CGMES files via cimgen: https://github.com/sogno-platform/cimgen
"""
-from ..utils.datatypes import CIMDatatype
-from ..utils.profile import Profile
+from ...utils.datatypes import CIMDatatype
+from ...utils.profile import Profile
{{#isFixed_imports}}
from .{{.}} import {{.}}
{{/isFixed_imports}}
diff --git a/cimgen/languages/modernpython/templates/enum_template.mustache b/cimgen/languages/modernpython/templates/enum_template.mustache
index d0b3826..1c51ff6 100644
--- a/cimgen/languages/modernpython/templates/enum_template.mustache
+++ b/cimgen/languages/modernpython/templates/enum_template.mustache
@@ -7,9 +7,10 @@ from enum import Enum
class {{class_name}}(str, Enum):
"""
- {{{class_comment}}} # noqa: E501
+ {{{wrapped_class_comment}}}
"""
{{#enum_instances}}
- {{label}} = "{{label}}"{{#comment}} # {{comment}}{{/comment}} # noqa: E501
+ {{label}} = "{{label}}"{{#comment}}
+ {{wrapped_comment}}{{/comment}}
{{/enum_instances}}
diff --git a/cimgen/languages/modernpython/templates/primitive_template.mustache b/cimgen/languages/modernpython/templates/primitive_template.mustache
index be44ab0..2a1a758 100644
--- a/cimgen/languages/modernpython/templates/primitive_template.mustache
+++ b/cimgen/languages/modernpython/templates/primitive_template.mustache
@@ -3,8 +3,8 @@ Generated from the CGMES files via cimgen: https://github.com/sogno-platform/cim
"""
from datetime import date, datetime, time
-from ..utils.datatypes import Primitive
-from ..utils.profile import Profile
+from ...utils.datatypes import Primitive
+from ...utils.profile import Profile
{{class_name}} = Primitive(
name="{{class_name}}",
diff --git a/cimgen/languages/modernpython/utils/base.py b/cimgen/languages/modernpython/utils/base.py
index f6f7570..1e5dc8c 100644
--- a/cimgen/languages/modernpython/utils/base.py
+++ b/cimgen/languages/modernpython/utils/base.py
@@ -1,8 +1,9 @@
# Drop in dataclass replacement, allowing easier json dump and validation in the future.
import importlib
+from lxml import etree
from dataclasses import Field, fields
from functools import cached_property
-from typing import Any, TypeAlias, TypedDict
+from typing import Any, TypeAlias, TypedDict, Optional
from pydantic.dataclasses import dataclass
@@ -89,6 +90,24 @@ def apparent_name(cls) -> str:
"""
return cls.__name__
+ def get_attribute_main_profile(self, attr: str) -> BaseProfile | None:
+ """Get the profile for this attribute of the CIM object.
+
+ This function searches for the profile of an attribute for the CIM type of an object.
+ If the main profile of the type is a possible profile of the attribute it should be choosen.
+ Otherwise, the first profile in the list of possible profiles ordered by profile number.
+
+ :param attr: Attribute to check
+ :return: Attribute profile.
+ """
+ attr_profiles_map = self.possible_attribute_profiles
+ profiles = attr_profiles_map.get(attr, [])
+ if self.recommended_profile in profiles:
+ return self.recommended_profile
+ if profiles:
+ return sorted(profiles)[0]
+ return None
+
def cgmes_attribute_names_in_profile(self, profile: BaseProfile | None) -> set[Field]:
"""
Returns all fields accross the parent tree which are in the profile in parameter.
@@ -133,38 +152,217 @@ def cgmes_attributes_in_profile(self, profile: BaseProfile | None) -> dict[str,
for f in fields(parent):
shortname = f.name
qualname = f"{parent.apparent_name()}.{shortname}" # type: ignore
+ infos = dict()
+
if f not in self.cgmes_attribute_names_in_profile(profile) or shortname in seen_attrs:
- # Wrong profile or already found from a parent.
continue
- else:
- # Namespace finding
- # "class namespace" means the first namespace defined in the inheritance tree.
- # This can go up to Base, which will give the default cim NS.
- if (extra := getattr(f.default, "json_schema_extra", None)) is None:
- # The attribute does not have extra metadata. It might be a custom atttribute
- # without it, or a base type (int...).
- # Use the class namespace.
- namespace = self.namespace
- elif (attr_ns := extra.get("namespace", None)) is None:
- # The attribute has some extras, but not namespace.
- # Use the class namespace.
- namespace = self.namespace
- else:
+
+ # Namespace finding
+ # "class namespace" means the first namespace defined in the inheritance tree.
+ # This can go up to Base, which will give the default cim NS.
+ infos["namespace"] = self.namespace
+ extra = getattr(f.default, "json_schema_extra", None)
+ if extra and extra.get("is_used"):
+ # adding the extras, used for xml generation
+ extra_info = {
+ "attr_name": qualname,
+ "is_class_attribute": extra.get("is_class_attribute"),
+ "is_enum_attribute": extra.get("is_enum_attribute"),
+ "is_list_attribute": extra.get("is_list_attribute"),
+ "is_primitive_attribute": extra.get("is_primitive_attribute"),
+ "is_datatype_attribute": extra.get("is_datatype_attribute"),
+ "attribute_class": extra.get("attribute_class"),
+ "attribute_main_profile": self.get_attribute_main_profile(shortname),
+ }
+ if extra.get("namespace"):
# The attribute has an explicit namesapce
- namespace = attr_ns
+ extra_info["namespace"] = extra.get("namespace", self.namespace)
+ infos.update(extra_info)
- qual_attrs[qualname] = CgmesAttribute(
- value=getattr(self, shortname),
- namespace=namespace,
- )
- seen_attrs.add(shortname)
+ infos["value"] = getattr(self, shortname)
+
+ qual_attrs[qualname] = CgmesAttribute(infos)
+ seen_attrs.add(shortname)
return qual_attrs
+ def to_xml(self, profile_to_export: BaseProfile, id: Optional[str] = None) -> Optional[etree.Element]:
+ """Creates an etree element of self with all non-empty attributes of the profile_to_export
+ that are not already defined in the recommanded profile
+ This can then be used to generate the xml file of the profile_to_export
+ Args:
+ profile_to_export (Profile): Profile for which we want to obtain the xml tree (eg. Profile.EQ)
+ id (Optional[str], optional): "mRID", optional: some objects don't have mRID attribute. Defaults to None.
+ Returns:
+ Optional[etree.Element]: etree describing self for the profile_to_export, None if nothing to export
+ """
+ profile_attributes = self._get_attributes_to_export(profile_to_export)
+
+ if "mRID" in self.to_dict():
+ obj_id = self.mRID
+ else:
+ obj_id = id
+
+ # if no attribute to export or no mRID, return None
+ if profile_attributes == {} or obj_id is None:
+ root = None
+ else:
+ # Create root element
+ nsmap = NAMESPACES
+ root = etree.Element("{" + self.namespace + "}" + self.resource_name, nsmap=nsmap)
+
+ # Add the ID as attribute to the root
+ if self.recommended_profile.value == profile_to_export.value:
+ root.set(f"""{{{nsmap["rdf"]}}}""" + "ID", obj_id)
+ else:
+ root.set(f"""{{{nsmap["rdf"]}}}""" + "about", "#" + obj_id)
+
+ root = self._add_attribute_to_etree(attributes=profile_attributes, root=root, nsmap=nsmap)
+ return root
+
+ def _get_attributes_to_export(self, profile_to_export: BaseProfile) -> dict:
+ attributes_to_export = {}
+ attributes_in_profile = self.cgmes_attributes_in_profile(profile_to_export)
+ for key, attribute in attributes_in_profile.items():
+ if attribute["attribute_main_profile"] == profile_to_export:
+ attributes_to_export[key] = attribute
+ attributes_to_export = self._remove_empty_attributes(attributes_to_export)
+ return attributes_to_export
+
+ @staticmethod
+ def _remove_empty_attributes(attributes: dict) -> dict:
+ for key, attribute in list(attributes.items()):
+ # Remove empty attributes
+ if attribute["value"] in [None, "", []]:
+ del attributes[key]
+ elif attribute.get("attribute_class") and attribute["attribute_class"] == "Boolean":
+ attribute["value"] = str(attribute["value"]).lower()
+ return attributes
+
+ @staticmethod
+ def _add_attribute_to_etree(attributes: dict, root: etree.Element, nsmap: dict) -> etree.Element:
+ rdf_namespace = f"""{{{nsmap["rdf"]}}}"""
+ for field_name, attribute in attributes.items():
+ # add all attributes relevant to the profile as SubElements
+ attr_namespace = attribute["namespace"]
+ element_name = f"{{{attr_namespace}}}{field_name}"
+
+ if attribute["is_class_attribute"]:
+ # class_attributes are exported as rdf: resource #mRID_of_target
+ element = etree.SubElement(root, element_name)
+ element.set(rdf_namespace + "resource", "#" + attribute["value"])
+ elif attribute["is_enum_attribute"]:
+ element = etree.SubElement(root, element_name)
+ element.set(rdf_namespace + "resource", nsmap["cim"] + attribute["value"])
+ else:
+ element = etree.SubElement(root, element_name)
+ element.text = str(attribute["value"])
+ return root
+
def __str__(self) -> str:
"""Returns the string representation of this resource."""
return "\n".join([f"{k}={v}" for k, v in sorted(self.to_dict().items())])
+ def _parse_xml_fragment(self, xml_fragment: str) -> dict:
+ """parses an xml fragment into a dict defining the class attributes
+
+ Args:
+ xml_fragment (str): xml string defining an instance of the current class
+
+ Returns:
+ attribute_dict (dict): a dictionnary of attributes to create/update the class instance
+ """
+ attribute_dict = {}
+ xml_tree = etree.fromstring(xml_fragment)
+
+ # raise an error if the xml does not describe the expected class
+ if not xml_tree.tag.endswith(self.resource_name):
+ raise (KeyError(f"The fragment does not correspond to the class {self.resource_name}"))
+
+ attribute_dict.update(self._extract_mrid_from_etree(xml_tree=xml_tree))
+
+ # parsing attributes defined in class
+ class_attributes = self.cgmes_attributes_in_profile(None)
+ for key, class_attribute in class_attributes.items():
+ xml_attribute = xml_tree.findall(".//{*}" + key)
+ if len(xml_attribute) != 1:
+ continue
+ xml_attribute = xml_attribute[0]
+ attr = key.rsplit(".")[-1]
+
+ attr_value = self._extract_attr_value_from_etree(attr, class_attribute, xml_attribute)
+ if hasattr(self, attr) and attr_value is not None:
+ attribute_dict[attr] = attr_value
+
+ return attribute_dict
+
+ def _extract_mrid_from_etree(self, xml_tree: etree.Element) -> dict:
+ """Parsing the mRID from etree attributes"""
+ mrid_dict = {}
+ for key, value in xml_tree.attrib.items():
+ if key.endswith("ID") or key.endswith("about"):
+ if value.startswith("#"):
+ value = value[1:]
+ if hasattr(self, "mRID") and value is not None:
+ mrid_dict = {"mRID": value}
+ return mrid_dict
+
+ def _extract_attr_value_from_etree(self, attr_name: str, class_attribute: dict, xml_attribute: etree.Element):
+ """Parsing the attribute value from etree attributes"""
+ attr_value = None
+ # class attributes are pointing to another class/instance defined in .attrib
+ if class_attribute["is_class_attribute"] and len(xml_attribute.keys()) == 1:
+ attr_value = xml_attribute.values()[0]
+ if attr_value.startswith("#"):
+ attr_value = attr_value[1:]
+
+ # enum attributes are defined in .attrib and has a prefix ending in "#"
+ elif class_attribute["is_enum_attribute"] and len(xml_attribute.keys()) == 1:
+ attr_value = xml_attribute.values()[0]
+ if "#" in attr_value:
+ attr_value = attr_value.split("#")[-1]
+
+ elif class_attribute["is_list_attribute"]:
+ attr_value = eval(xml_attribute.text)
+ elif class_attribute["is_primitive_attribute"] or class_attribute["is_datatype_attribute"]:
+ attr_value = xml_attribute.text
+ if self.__dataclass_fields__[attr_name].type == bool:
+ attr_value = {"true": True, "false": False}.get(attr_value, None)
+ else:
+ # types are int, float or str (date, time and datetime treated as str)
+ attr_value = self.__dataclass_fields__[attr_name].type(attr_value)
+ else:
+ # other attributes types are defined in .text
+ attr_value = xml_attribute.text
+ return attr_value
+
+ def update_from_xml(self, xml_fragment: str):
+ """
+ Updates the instance by parsing an xml fragment defining the attributes of this instance
+ example: updating the instance by parsing the corresponding fragment from the SSH profile
+ """
+ attribute_dict = self._parse_xml_fragment(xml_fragment)
+
+ if attribute_dict["mRID"] == self.mRID:
+ for key, value in attribute_dict.items():
+ attr = getattr(self, key)
+ if isinstance(attr, list):
+ getattr(self, key).extend(value)
+ else:
+ setattr(self, key, value)
+
+ @classmethod
+ def from_xml(cls, xml_fragment: str):
+ """
+ Returns an instance of the class from an xml fragment defining the attributes written in the form:
+ ...
+ example: creating an instance by parsing a fragment from the EQ profile
+ """
+ attribute_dict = cls()._parse_xml_fragment(xml_fragment)
+
+ # Instantiate the class with the dictionary
+ return cls(**attribute_dict)
+
@staticmethod
def get_extra_prop(field: Field, prop: str) -> Any:
# The field is defined as a pydantic field, not a dataclass field,
diff --git a/cimgen/languages/modernpython/utils/datatypes.py b/cimgen/languages/modernpython/utils/datatypes.py
index db3d6ba..a249c93 100644
--- a/cimgen/languages/modernpython/utils/datatypes.py
+++ b/cimgen/languages/modernpython/utils/datatypes.py
@@ -6,9 +6,9 @@
from .config import cgmes_resource_config
from .profile import BaseProfile
-from ..resources.UnitMultiplier import UnitMultiplier
-from ..resources.UnitSymbol import UnitSymbol
-from ..resources.Currency import Currency
+from ..resources.types.UnitMultiplier import UnitMultiplier
+from ..resources.types.UnitSymbol import UnitSymbol
+from ..resources.types.Currency import Currency
@dataclass(config=cgmes_resource_config)
diff --git a/cimgen/languages/modernpython/utils/reader.py b/cimgen/languages/modernpython/utils/reader.py
new file mode 100644
index 0000000..386ce1c
--- /dev/null
+++ b/cimgen/languages/modernpython/utils/reader.py
@@ -0,0 +1,179 @@
+from lxml import etree
+import importlib
+import logging
+from .profile import Profile
+from pydantic import BaseModel, Field
+from typing import Dict, Optional, Literal
+
+
+logger = logging.getLogger(__name__)
+
+
+class Reader(BaseModel):
+ """Parses profiles to create the corresponding python objects
+
+ Args:
+ cgmes_version_path (str): Path to the cgmes resources folder containing the class definition
+ custom_namespaces (Optional[[str, str]]): {"namespace_prefix": "namespace_uri"}
+ custom_folder (Optional[str]): "path_to_custom_resources_folder"
+ """
+
+ cgmes_version_path: str
+ custom_namespaces: Optional[Dict[str, str]] = None
+ custom_folder: Optional[str] = None
+ logger_grouped: Dict[str, Dict[str, int]] = Field(default_factory=lambda: {"errors": {}, "info": {}})
+ import_result: Dict = Field(default_factory=lambda: {"meta_info": {}, "topology": {}})
+
+ def parse_profiles(self, xml_files: list[str], start_dict: Optional[Dict] = None):
+ """Parses all profiles contained in xml_files and returns a list containing
+ all the objects defined in the profiles "mRID": Object\n
+ Errors encounterd in the parsing can be recovered in Reader.logger_grouped
+
+ Args:
+ xml_files (list): list with the path to all the profiles to parse
+ start_dict (Optional[Dict]): To parse profiles on top of an existing list dict(meta_info, topology)
+
+ Returns:
+ list: ["topology": dict of all the objects defined in the profiles {"mRID": Object}, "meta_info"]
+ """
+ if start_dict is not None:
+ self.import_result = start_dict
+ self.import_result["meta_info"] = dict(namespaces=self._get_namespaces(xml_files[0]), urls={})
+ namespace_rdf = self._get_rdf_namespace()
+
+ bases = ["{" + self.import_result["meta_info"]["namespaces"]["cim"] + "}"]
+ if self.custom_namespaces:
+ for custom_namespace in self.custom_namespaces.values():
+ bases.append("{" + custom_namespace + "}")
+ bases = tuple(bases)
+
+ for xml_file in xml_files:
+ self._instantiate_classes(xml_file=xml_file, bases=bases, namespace_rdf=namespace_rdf)
+ return self.import_result
+
+ def _instantiate_classes(self, xml_file: str, bases: tuple, namespace_rdf: str):
+ """creates/updates the python objects with the information of xml_file
+
+ Args:
+ xml_file (str): Path to the profile
+ bases (tuple): contains the possible namespaces uris defining the classes, can be custom
+ namespace_rdf (str): rdf namespace uri
+ """
+ context = etree.iterparse(xml_file, ("start", "end"))
+ level = 0
+
+ for event, elem in context:
+ if event == "end":
+ level -= 1
+ if event == "start":
+ level += 1
+
+ class_namespace = next((namespace for namespace in bases if elem.tag.startswith(namespace)), None)
+ if event == "start" and class_namespace is not None and level == 2:
+ class_name, uuid = self._extract_classname_uuid(elem, class_namespace, namespace_rdf)
+ if uuid is not None:
+ self._process_element(class_name, uuid, elem)
+ # Check which package is read
+ elif event == "end":
+ self._check_metadata(elem)
+
+ @staticmethod
+ def _extract_classname_uuid(elem, class_namespace: str, namespace_rdf: str) -> tuple:
+ """Extracts class name and instance uuid ("mRID")
+
+ Args:
+ elem (etree.Element): description of the instance for the given profile
+ class_namespace (str): namespace uri defining the class
+ namespace_rdf (str): rdf namespace uri
+
+ Returns:
+ tuple: (class_name: example "ACLineSgement", instance_uuid: "mRID")
+ """
+ class_name = elem.tag[len(class_namespace) :]
+ uuid = elem.get("{%s}ID" % namespace_rdf)
+ if uuid is None:
+ uuid = elem.get("{%s}about" % namespace_rdf)
+ if uuid is not None:
+ uuid = uuid[1:]
+ return class_name, uuid
+
+ def _process_element(self, class_name: str, uuid: str, elem):
+ """Creates or updates (if an object with the same uuid exists)
+ an instance of the class based on the fragment of the profile
+
+ Args:
+ class_name (str): Name of the class of the instance to create/update (example: ACLineSegment)
+ uuid (str): mRID
+ elem (etree.Element): description of the instance for the given profile
+ """
+ topology = self.import_result["topology"]
+ elem_str = etree.tostring(elem, encoding="utf8")
+ try:
+ # Import the module for the CGMES object.
+ module_name = self._get_path_to_module(class_name)
+ module = importlib.import_module(module_name)
+
+ klass = getattr(module, class_name)
+ if uuid not in topology:
+ topology[uuid] = klass().from_xml(elem_str)
+ info_msg = "CIM object {} created".format(module_name.split(".")[-1])
+ else:
+ obj = topology[uuid]
+ obj.update_from_xml(elem_str)
+ info_msg = "CIM object {} updated".format(module_name.split(".")[-1])
+ self._log_message("info", info_msg)
+
+ except ModuleNotFoundError:
+ error_msg = "Module {} not implemented".format(class_name)
+ self._log_message("errors", error_msg)
+ except Exception as e:
+ error_msg = "Could not create/update {}, {}".format(uuid, e)
+ self._log_message("errors", error_msg)
+
+ def _check_metadata(self, elem):
+ if "Model.profile" in elem.tag:
+ for package_key in [e.value for e in Profile]:
+ if package_key in elem.text:
+ break
+ # the author of all imported files should be the same, avoid multiple entries
+ elif "author" not in self.import_result["meta_info"].keys():
+ if any(author_field in elem.tag for author_field in ("Model.createdBy", "Model.modelingAuthoritySet")):
+ self.import_result["meta_info"]["author"] = elem.text
+
+ # Returns a map of class_namespace to namespace for the given XML file.
+ @staticmethod
+ def _get_namespaces(source) -> Dict:
+ namespaces = {}
+ events = ("end", "start-ns", "end-ns")
+ for event, elem in etree.iterparse(source, events):
+ if event == "start-ns":
+ class_namespace, ns = elem
+ namespaces[class_namespace] = ns
+ elif event == "end":
+ break
+
+ # Reset stream
+ if hasattr(source, "seek"):
+ source.seek(0)
+
+ return namespaces
+
+ # Returns the RDF Namespace from the namespaces dictionary
+ def _get_rdf_namespace(self) -> str:
+ try:
+ namespace = self.import_result["meta_info"]["namespaces"]["rdf"]
+ except KeyError:
+ namespace = "http://www.w3.org/1999/02/22-rdf-syntax-ns#" # NOSONAR
+ logger.warning("No rdf namespace found. Using %s" % namespace)
+ return namespace
+
+ def _get_path_to_module(self, class_name: str) -> str:
+ if self.custom_folder and importlib.find_loader(self.custom_folder + "." + class_name):
+ path_to_module = self.custom_folder + "." + class_name
+ else:
+ path_to_module = self.cgmes_version_path + "." + class_name
+ return path_to_module
+
+ def _log_message(self, log_type: Literal["errors", "info"], message: str):
+ self.logger_grouped[log_type].setdefault(message, 0)
+ self.logger_grouped[log_type][message] += 1
diff --git a/cimgen/languages/modernpython/utils/writer.py b/cimgen/languages/modernpython/utils/writer.py
new file mode 100644
index 0000000..036c47e
--- /dev/null
+++ b/cimgen/languages/modernpython/utils/writer.py
@@ -0,0 +1,104 @@
+from lxml import etree
+from pydantic import BaseModel
+from .constants import NAMESPACES
+from .profile import BaseProfile, Profile
+from typing import Dict, Optional
+
+
+class Writer(BaseModel):
+ """Class for writing CIM RDF/XML files
+
+ Args:
+ objects (dict): Mapping of rdfid to CIM object
+ Model_metadata (Optional[Dict[str, str]]): any additional data to add in header
+ default = {"modelingAuthoritySet": "www.sogno.energy" }
+ """
+
+ objects: Dict
+ writer_metadata: Dict[str, str] = {}
+
+ def write(
+ self,
+ outputfile: str,
+ model_id: str,
+ class_profile_map: Dict[str, BaseProfile] = {},
+ custom_namespaces: Dict[str, str] = {},
+ ) -> dict[BaseProfile, str]:
+ """Write CIM RDF/XML files.
+ This function writes CIM objects into one or more RDF/XML files separated by profiles.
+ Each CIM object will be written to its corresponding profile file depending on class_profile_map.
+ But some objects to more than one file if some attribute profiles are not the same as the class profile.
+
+ Args:
+ outputfile (str): Stem of the output file, resulting files: _.xml.
+ model_id (str): Stem of the model IDs, resulting IDs: _.
+ class_profile_map Optional[Dict[str, str]: Mapping of CIM type to profile.
+ custom_namespaces Optional[Dict[str, str]: {"namespace_prefix": "namespace_uri"}
+
+ Returns:
+ Mapping of profile to outputfile.
+ """
+ profile_list: list[BaseProfile] = list(Profile)
+ if class_profile_map:
+ profile_list += {p for p in class_profile_map.values() if p not in profile_list}
+ profile_file_map: dict[BaseProfile, str] = {}
+ for profile in profile_list:
+ profile_name = profile.long_name
+ full_file_name = outputfile + "_" + profile.long_name + ".xml"
+ output = self._generate(profile, model_id + "_" + profile_name, custom_namespaces)
+ if output:
+ output.write(full_file_name, pretty_print=True, xml_declaration=True, encoding="UTF-8")
+ profile_file_map[profile] = full_file_name
+ return profile_file_map
+
+ def _generate(
+ self, profile: BaseProfile, model_id: str, custom_namespaces: Dict[str, str] = {}
+ ) -> Optional[etree.ElementTree]:
+ """Write CIM objects as RDF/XML data to a string.
+
+ This function creates RDF/XML tree corresponding to one profile.
+
+ Args:
+ profile (BaseProfile): Only data for this profile should be written.
+ model_id (str): Stem of the model IDs, resulting IDs: _.
+ custom_namespaces Optional[Dict[str, str]]: {"namespace_prefix": "namespace_uri"}
+
+ Returns:
+ etree of the profile
+ """
+ writer_info = {"modelingAuthoritySet": "www.sogno.energy"}
+ writer_info.update(self.writer_metadata)
+ fullmodel = {
+ "id": model_id,
+ "Model": writer_info,
+ }
+ for uri in profile.uris:
+ fullmodel["Model"].update({"profile": uri})
+
+ nsmap = NAMESPACES
+ nsmap.update(custom_namespaces)
+
+ rdf_namespace = f"""{{{nsmap["rdf"]}}}"""
+ md_namespace = f"""{{{nsmap["md"]}}}"""
+
+ root = etree.Element(rdf_namespace + "RDF", nsmap=nsmap)
+
+ # FullModel header
+ model = etree.Element(md_namespace + "FullModel", nsmap=nsmap)
+ model.set(rdf_namespace + "about", "#" + fullmodel["id"])
+ for key, value in fullmodel["Model"].items():
+ element = etree.SubElement(model, md_namespace + "Model." + key)
+ element.text = value
+ root.append(model)
+
+ count = 0
+ for id, obj in self.objects.items():
+ obj_etree = obj.to_xml(profile_to_export=profile, id=id)
+ if obj_etree is not None:
+ root.append(obj_etree)
+ count += 1
+ if count > 0:
+ output = etree.ElementTree(root)
+ else:
+ output = None
+ return output