diff --git a/cimgen/cimgen.py b/cimgen/cimgen.py index dfea746..3f8cd79 100644 --- a/cimgen/cimgen.py +++ b/cimgen/cimgen.py @@ -20,6 +20,8 @@ def asJson(self): jsonObject = {} if self.about() is not None: jsonObject["about"] = self.about() + if self.namespace() is not None: + jsonObject["namespace"] = self.namespace() if self.comment() is not None: jsonObject["comment"] = self.comment() if self.dataType() is not None: @@ -51,6 +53,12 @@ def about(self): else: return None + def namespace(self): + if "$rdf:about" in self.jsonDefinition: + return self.jsonDefinition["$rdf:about"][: -len(self.about())] + else: + return None + # Capitalized True/False is valid in python but not in json. # Do not use this function in combination with json.load() def is_used(self) -> bool: @@ -211,6 +219,7 @@ def __init__(self, rdfsEntry): self.super = rdfsEntry.subClassOf() self.subclasses = [] self.stereotype = rdfsEntry.stereotype() + self.namespace = rdfsEntry.namespace() def attributes(self): return self.attribute_list @@ -405,6 +414,13 @@ def _parse_rdf(input_dic, version): # NOSONAR for instance in enum_instances: clarse = _get_rid_of_hash(instance["type"]) if clarse and clarse in classes_map: + if instance.get("comment"): + instance["wrapped_comment"] = wrap_and_clean( + instance["comment"], + width=100, + initial_indent="# ", + subsequent_indent=(" # "), + ) classes_map[clarse].add_enum_instance(instance) else: logger.info("Class {} for enum instance {} not found.".format(clarse, instance)) @@ -429,6 +445,7 @@ def _write_python_files(elem_dict, lang_pack, output_path, version): "class_location": lang_pack.get_class_location(class_name, elem_dict, version), "class_name": class_name, "class_origin": elem_dict[class_name].origins(), + "class_namespace": _get_namespace(elem_dict[class_name].namespace), "enum_instances": elem_dict[class_name].enum_instances(), "is_an_enum_class": elem_dict[class_name].is_an_enum_class(), "is_a_primitive_class": elem_dict[class_name].is_a_primitive_class(), @@ -467,6 +484,7 @@ def _write_python_files(elem_dict, lang_pack, output_path, version): attribute["is_primitive_attribute"] = _get_bool_string(attribute_type == "primitive") attribute["is_datatype_attribute"] = _get_bool_string(attribute_type == "datatype") attribute["attribute_class"] = attribute_class + attribute["attribute_namespace"] = _get_namespace(attribute["namespace"]) class_details["attributes"].sort(key=lambda d: d["label"]) _write_files(class_details, output_path, version) @@ -749,6 +767,14 @@ def _get_attribute_type(attribute: dict, class_infos: CIMComponentDefinition) -> return attribute_type +def _get_namespace(parsed_namespace: str) -> str: + if parsed_namespace == "#": + namespace = cim_namespace + else: + namespace = parsed_namespace + return namespace + + def _get_bool_string(bool_value: bool) -> str: """Convert boolean value into a string which is usable in both Python and Json. diff --git a/cimgen/languages/modernpython/lang_pack.py b/cimgen/languages/modernpython/lang_pack.py index 64cb410..58c5c1f 100644 --- a/cimgen/languages/modernpython/lang_pack.py +++ b/cimgen/languages/modernpython/lang_pack.py @@ -98,14 +98,6 @@ def _get_python_type(datatype): return "float" -def _set_imports(attributes): - import_set = set() - for attribute in attributes: - if attribute["is_datatype_attribute"] or attribute["is_primitive_attribute"]: - import_set.add(attribute["attribute_class"]) - return sorted(import_set) - - def _set_datatype_attributes(attributes) -> dict: datatype_attributes = {} datatype_attributes["python_type"] = "None" @@ -136,13 +128,20 @@ def run_template(output_path, class_details): template = class_template_file class_details["setDefault"] = _set_default class_details["setType"] = _set_type - class_details["imports"] = _set_imports(class_details["attributes"]) resource_file = _create_file(output_path, class_details, template) _write_templated_file(resource_file, class_details, template["filename"]) def _create_file(output_path, class_details, template) -> str: - resource_file = Path(output_path) / "resources" / (class_details["class_name"] + template["ext"]) + if ( + class_details["is_a_primitive_class"] + or class_details["is_a_datatype_class"] + or class_details["is_an_enum_class"] + ): + class_category = "types" + else: + class_category = "" + resource_file = Path(output_path) / "resources" / class_category / (class_details["class_name"] + template["ext"]) resource_file.parent.mkdir(exist_ok=True) return str(resource_file) diff --git a/cimgen/languages/modernpython/templates/class_template.mustache b/cimgen/languages/modernpython/templates/class_template.mustache index 5956170..da6ee72 100644 --- a/cimgen/languages/modernpython/templates/class_template.mustache +++ b/cimgen/languages/modernpython/templates/class_template.mustache @@ -10,9 +10,6 @@ from pydantic.dataclasses import dataclass from ..utils.profile import BaseProfile, Profile from {{class_location}} import {{sub_class_of}} -{{#imports}} -from .{{.}} import {{.}} -{{/imports}} @dataclass @@ -35,16 +32,17 @@ class {{class_name}}({{sub_class_of}}): {{/attr_origin}} ], "is_used": {{#is_used}}True{{/is_used}}{{^is_used}}False{{/is_used}}, + "namespace": "{{attribute_namespace}}", # NOSONAR "is_class_attribute": {{#is_class_attribute}}True{{/is_class_attribute}}{{^is_class_attribute}}False{{/is_class_attribute}}, "is_datatype_attribute": {{#is_datatype_attribute}}True{{/is_datatype_attribute}}{{^is_datatype_attribute}}False{{/is_datatype_attribute}}, "is_enum_attribute": {{#is_enum_attribute}}True{{/is_enum_attribute}}{{^is_enum_attribute}}False{{/is_enum_attribute}}, "is_list_attribute": {{#is_list_attribute}}True{{/is_list_attribute}}{{^is_list_attribute}}False{{/is_list_attribute}}, "is_primitive_attribute": {{#is_primitive_attribute}}True{{/is_primitive_attribute}}{{^is_primitive_attribute}}False{{/is_primitive_attribute}}, {{#is_datatype_attribute}} - "attribute_class": {{attribute_class}}, + "attribute_class": "{{attribute_class}}", {{/is_datatype_attribute}} {{#is_primitive_attribute}} - "attribute_class": {{attribute_class}}, + "attribute_class": "{{attribute_class}}", {{/is_primitive_attribute}} }, ) diff --git a/cimgen/languages/modernpython/templates/datatype_template.mustache b/cimgen/languages/modernpython/templates/datatype_template.mustache index 6d69021..a6d4d6a 100644 --- a/cimgen/languages/modernpython/templates/datatype_template.mustache +++ b/cimgen/languages/modernpython/templates/datatype_template.mustache @@ -2,8 +2,8 @@ Generated from the CGMES files via cimgen: https://github.com/sogno-platform/cimgen """ -from ..utils.datatypes import CIMDatatype -from ..utils.profile import Profile +from ...utils.datatypes import CIMDatatype +from ...utils.profile import Profile {{#isFixed_imports}} from .{{.}} import {{.}} {{/isFixed_imports}} diff --git a/cimgen/languages/modernpython/templates/enum_template.mustache b/cimgen/languages/modernpython/templates/enum_template.mustache index d0b3826..1c51ff6 100644 --- a/cimgen/languages/modernpython/templates/enum_template.mustache +++ b/cimgen/languages/modernpython/templates/enum_template.mustache @@ -7,9 +7,10 @@ from enum import Enum class {{class_name}}(str, Enum): """ - {{{class_comment}}} # noqa: E501 + {{{wrapped_class_comment}}} """ {{#enum_instances}} - {{label}} = "{{label}}"{{#comment}} # {{comment}}{{/comment}} # noqa: E501 + {{label}} = "{{label}}"{{#comment}} + {{wrapped_comment}}{{/comment}} {{/enum_instances}} diff --git a/cimgen/languages/modernpython/templates/primitive_template.mustache b/cimgen/languages/modernpython/templates/primitive_template.mustache index be44ab0..2a1a758 100644 --- a/cimgen/languages/modernpython/templates/primitive_template.mustache +++ b/cimgen/languages/modernpython/templates/primitive_template.mustache @@ -3,8 +3,8 @@ Generated from the CGMES files via cimgen: https://github.com/sogno-platform/cim """ from datetime import date, datetime, time -from ..utils.datatypes import Primitive -from ..utils.profile import Profile +from ...utils.datatypes import Primitive +from ...utils.profile import Profile {{class_name}} = Primitive( name="{{class_name}}", diff --git a/cimgen/languages/modernpython/utils/base.py b/cimgen/languages/modernpython/utils/base.py index f6f7570..1e5dc8c 100644 --- a/cimgen/languages/modernpython/utils/base.py +++ b/cimgen/languages/modernpython/utils/base.py @@ -1,8 +1,9 @@ # Drop in dataclass replacement, allowing easier json dump and validation in the future. import importlib +from lxml import etree from dataclasses import Field, fields from functools import cached_property -from typing import Any, TypeAlias, TypedDict +from typing import Any, TypeAlias, TypedDict, Optional from pydantic.dataclasses import dataclass @@ -89,6 +90,24 @@ def apparent_name(cls) -> str: """ return cls.__name__ + def get_attribute_main_profile(self, attr: str) -> BaseProfile | None: + """Get the profile for this attribute of the CIM object. + + This function searches for the profile of an attribute for the CIM type of an object. + If the main profile of the type is a possible profile of the attribute it should be choosen. + Otherwise, the first profile in the list of possible profiles ordered by profile number. + + :param attr: Attribute to check + :return: Attribute profile. + """ + attr_profiles_map = self.possible_attribute_profiles + profiles = attr_profiles_map.get(attr, []) + if self.recommended_profile in profiles: + return self.recommended_profile + if profiles: + return sorted(profiles)[0] + return None + def cgmes_attribute_names_in_profile(self, profile: BaseProfile | None) -> set[Field]: """ Returns all fields accross the parent tree which are in the profile in parameter. @@ -133,38 +152,217 @@ def cgmes_attributes_in_profile(self, profile: BaseProfile | None) -> dict[str, for f in fields(parent): shortname = f.name qualname = f"{parent.apparent_name()}.{shortname}" # type: ignore + infos = dict() + if f not in self.cgmes_attribute_names_in_profile(profile) or shortname in seen_attrs: - # Wrong profile or already found from a parent. continue - else: - # Namespace finding - # "class namespace" means the first namespace defined in the inheritance tree. - # This can go up to Base, which will give the default cim NS. - if (extra := getattr(f.default, "json_schema_extra", None)) is None: - # The attribute does not have extra metadata. It might be a custom atttribute - # without it, or a base type (int...). - # Use the class namespace. - namespace = self.namespace - elif (attr_ns := extra.get("namespace", None)) is None: - # The attribute has some extras, but not namespace. - # Use the class namespace. - namespace = self.namespace - else: + + # Namespace finding + # "class namespace" means the first namespace defined in the inheritance tree. + # This can go up to Base, which will give the default cim NS. + infos["namespace"] = self.namespace + extra = getattr(f.default, "json_schema_extra", None) + if extra and extra.get("is_used"): + # adding the extras, used for xml generation + extra_info = { + "attr_name": qualname, + "is_class_attribute": extra.get("is_class_attribute"), + "is_enum_attribute": extra.get("is_enum_attribute"), + "is_list_attribute": extra.get("is_list_attribute"), + "is_primitive_attribute": extra.get("is_primitive_attribute"), + "is_datatype_attribute": extra.get("is_datatype_attribute"), + "attribute_class": extra.get("attribute_class"), + "attribute_main_profile": self.get_attribute_main_profile(shortname), + } + if extra.get("namespace"): # The attribute has an explicit namesapce - namespace = attr_ns + extra_info["namespace"] = extra.get("namespace", self.namespace) + infos.update(extra_info) - qual_attrs[qualname] = CgmesAttribute( - value=getattr(self, shortname), - namespace=namespace, - ) - seen_attrs.add(shortname) + infos["value"] = getattr(self, shortname) + + qual_attrs[qualname] = CgmesAttribute(infos) + seen_attrs.add(shortname) return qual_attrs + def to_xml(self, profile_to_export: BaseProfile, id: Optional[str] = None) -> Optional[etree.Element]: + """Creates an etree element of self with all non-empty attributes of the profile_to_export + that are not already defined in the recommanded profile + This can then be used to generate the xml file of the profile_to_export + Args: + profile_to_export (Profile): Profile for which we want to obtain the xml tree (eg. Profile.EQ) + id (Optional[str], optional): "mRID", optional: some objects don't have mRID attribute. Defaults to None. + Returns: + Optional[etree.Element]: etree describing self for the profile_to_export, None if nothing to export + """ + profile_attributes = self._get_attributes_to_export(profile_to_export) + + if "mRID" in self.to_dict(): + obj_id = self.mRID + else: + obj_id = id + + # if no attribute to export or no mRID, return None + if profile_attributes == {} or obj_id is None: + root = None + else: + # Create root element + nsmap = NAMESPACES + root = etree.Element("{" + self.namespace + "}" + self.resource_name, nsmap=nsmap) + + # Add the ID as attribute to the root + if self.recommended_profile.value == profile_to_export.value: + root.set(f"""{{{nsmap["rdf"]}}}""" + "ID", obj_id) + else: + root.set(f"""{{{nsmap["rdf"]}}}""" + "about", "#" + obj_id) + + root = self._add_attribute_to_etree(attributes=profile_attributes, root=root, nsmap=nsmap) + return root + + def _get_attributes_to_export(self, profile_to_export: BaseProfile) -> dict: + attributes_to_export = {} + attributes_in_profile = self.cgmes_attributes_in_profile(profile_to_export) + for key, attribute in attributes_in_profile.items(): + if attribute["attribute_main_profile"] == profile_to_export: + attributes_to_export[key] = attribute + attributes_to_export = self._remove_empty_attributes(attributes_to_export) + return attributes_to_export + + @staticmethod + def _remove_empty_attributes(attributes: dict) -> dict: + for key, attribute in list(attributes.items()): + # Remove empty attributes + if attribute["value"] in [None, "", []]: + del attributes[key] + elif attribute.get("attribute_class") and attribute["attribute_class"] == "Boolean": + attribute["value"] = str(attribute["value"]).lower() + return attributes + + @staticmethod + def _add_attribute_to_etree(attributes: dict, root: etree.Element, nsmap: dict) -> etree.Element: + rdf_namespace = f"""{{{nsmap["rdf"]}}}""" + for field_name, attribute in attributes.items(): + # add all attributes relevant to the profile as SubElements + attr_namespace = attribute["namespace"] + element_name = f"{{{attr_namespace}}}{field_name}" + + if attribute["is_class_attribute"]: + # class_attributes are exported as rdf: resource #mRID_of_target + element = etree.SubElement(root, element_name) + element.set(rdf_namespace + "resource", "#" + attribute["value"]) + elif attribute["is_enum_attribute"]: + element = etree.SubElement(root, element_name) + element.set(rdf_namespace + "resource", nsmap["cim"] + attribute["value"]) + else: + element = etree.SubElement(root, element_name) + element.text = str(attribute["value"]) + return root + def __str__(self) -> str: """Returns the string representation of this resource.""" return "\n".join([f"{k}={v}" for k, v in sorted(self.to_dict().items())]) + def _parse_xml_fragment(self, xml_fragment: str) -> dict: + """parses an xml fragment into a dict defining the class attributes + + Args: + xml_fragment (str): xml string defining an instance of the current class + + Returns: + attribute_dict (dict): a dictionnary of attributes to create/update the class instance + """ + attribute_dict = {} + xml_tree = etree.fromstring(xml_fragment) + + # raise an error if the xml does not describe the expected class + if not xml_tree.tag.endswith(self.resource_name): + raise (KeyError(f"The fragment does not correspond to the class {self.resource_name}")) + + attribute_dict.update(self._extract_mrid_from_etree(xml_tree=xml_tree)) + + # parsing attributes defined in class + class_attributes = self.cgmes_attributes_in_profile(None) + for key, class_attribute in class_attributes.items(): + xml_attribute = xml_tree.findall(".//{*}" + key) + if len(xml_attribute) != 1: + continue + xml_attribute = xml_attribute[0] + attr = key.rsplit(".")[-1] + + attr_value = self._extract_attr_value_from_etree(attr, class_attribute, xml_attribute) + if hasattr(self, attr) and attr_value is not None: + attribute_dict[attr] = attr_value + + return attribute_dict + + def _extract_mrid_from_etree(self, xml_tree: etree.Element) -> dict: + """Parsing the mRID from etree attributes""" + mrid_dict = {} + for key, value in xml_tree.attrib.items(): + if key.endswith("ID") or key.endswith("about"): + if value.startswith("#"): + value = value[1:] + if hasattr(self, "mRID") and value is not None: + mrid_dict = {"mRID": value} + return mrid_dict + + def _extract_attr_value_from_etree(self, attr_name: str, class_attribute: dict, xml_attribute: etree.Element): + """Parsing the attribute value from etree attributes""" + attr_value = None + # class attributes are pointing to another class/instance defined in .attrib + if class_attribute["is_class_attribute"] and len(xml_attribute.keys()) == 1: + attr_value = xml_attribute.values()[0] + if attr_value.startswith("#"): + attr_value = attr_value[1:] + + # enum attributes are defined in .attrib and has a prefix ending in "#" + elif class_attribute["is_enum_attribute"] and len(xml_attribute.keys()) == 1: + attr_value = xml_attribute.values()[0] + if "#" in attr_value: + attr_value = attr_value.split("#")[-1] + + elif class_attribute["is_list_attribute"]: + attr_value = eval(xml_attribute.text) + elif class_attribute["is_primitive_attribute"] or class_attribute["is_datatype_attribute"]: + attr_value = xml_attribute.text + if self.__dataclass_fields__[attr_name].type == bool: + attr_value = {"true": True, "false": False}.get(attr_value, None) + else: + # types are int, float or str (date, time and datetime treated as str) + attr_value = self.__dataclass_fields__[attr_name].type(attr_value) + else: + # other attributes types are defined in .text + attr_value = xml_attribute.text + return attr_value + + def update_from_xml(self, xml_fragment: str): + """ + Updates the instance by parsing an xml fragment defining the attributes of this instance + example: updating the instance by parsing the corresponding fragment from the SSH profile + """ + attribute_dict = self._parse_xml_fragment(xml_fragment) + + if attribute_dict["mRID"] == self.mRID: + for key, value in attribute_dict.items(): + attr = getattr(self, key) + if isinstance(attr, list): + getattr(self, key).extend(value) + else: + setattr(self, key, value) + + @classmethod + def from_xml(cls, xml_fragment: str): + """ + Returns an instance of the class from an xml fragment defining the attributes written in the form: + ... + example: creating an instance by parsing a fragment from the EQ profile + """ + attribute_dict = cls()._parse_xml_fragment(xml_fragment) + + # Instantiate the class with the dictionary + return cls(**attribute_dict) + @staticmethod def get_extra_prop(field: Field, prop: str) -> Any: # The field is defined as a pydantic field, not a dataclass field, diff --git a/cimgen/languages/modernpython/utils/datatypes.py b/cimgen/languages/modernpython/utils/datatypes.py index db3d6ba..a249c93 100644 --- a/cimgen/languages/modernpython/utils/datatypes.py +++ b/cimgen/languages/modernpython/utils/datatypes.py @@ -6,9 +6,9 @@ from .config import cgmes_resource_config from .profile import BaseProfile -from ..resources.UnitMultiplier import UnitMultiplier -from ..resources.UnitSymbol import UnitSymbol -from ..resources.Currency import Currency +from ..resources.types.UnitMultiplier import UnitMultiplier +from ..resources.types.UnitSymbol import UnitSymbol +from ..resources.types.Currency import Currency @dataclass(config=cgmes_resource_config) diff --git a/cimgen/languages/modernpython/utils/reader.py b/cimgen/languages/modernpython/utils/reader.py new file mode 100644 index 0000000..386ce1c --- /dev/null +++ b/cimgen/languages/modernpython/utils/reader.py @@ -0,0 +1,179 @@ +from lxml import etree +import importlib +import logging +from .profile import Profile +from pydantic import BaseModel, Field +from typing import Dict, Optional, Literal + + +logger = logging.getLogger(__name__) + + +class Reader(BaseModel): + """Parses profiles to create the corresponding python objects + + Args: + cgmes_version_path (str): Path to the cgmes resources folder containing the class definition + custom_namespaces (Optional[[str, str]]): {"namespace_prefix": "namespace_uri"} + custom_folder (Optional[str]): "path_to_custom_resources_folder" + """ + + cgmes_version_path: str + custom_namespaces: Optional[Dict[str, str]] = None + custom_folder: Optional[str] = None + logger_grouped: Dict[str, Dict[str, int]] = Field(default_factory=lambda: {"errors": {}, "info": {}}) + import_result: Dict = Field(default_factory=lambda: {"meta_info": {}, "topology": {}}) + + def parse_profiles(self, xml_files: list[str], start_dict: Optional[Dict] = None): + """Parses all profiles contained in xml_files and returns a list containing + all the objects defined in the profiles "mRID": Object\n + Errors encounterd in the parsing can be recovered in Reader.logger_grouped + + Args: + xml_files (list): list with the path to all the profiles to parse + start_dict (Optional[Dict]): To parse profiles on top of an existing list dict(meta_info, topology) + + Returns: + list: ["topology": dict of all the objects defined in the profiles {"mRID": Object}, "meta_info"] + """ + if start_dict is not None: + self.import_result = start_dict + self.import_result["meta_info"] = dict(namespaces=self._get_namespaces(xml_files[0]), urls={}) + namespace_rdf = self._get_rdf_namespace() + + bases = ["{" + self.import_result["meta_info"]["namespaces"]["cim"] + "}"] + if self.custom_namespaces: + for custom_namespace in self.custom_namespaces.values(): + bases.append("{" + custom_namespace + "}") + bases = tuple(bases) + + for xml_file in xml_files: + self._instantiate_classes(xml_file=xml_file, bases=bases, namespace_rdf=namespace_rdf) + return self.import_result + + def _instantiate_classes(self, xml_file: str, bases: tuple, namespace_rdf: str): + """creates/updates the python objects with the information of xml_file + + Args: + xml_file (str): Path to the profile + bases (tuple): contains the possible namespaces uris defining the classes, can be custom + namespace_rdf (str): rdf namespace uri + """ + context = etree.iterparse(xml_file, ("start", "end")) + level = 0 + + for event, elem in context: + if event == "end": + level -= 1 + if event == "start": + level += 1 + + class_namespace = next((namespace for namespace in bases if elem.tag.startswith(namespace)), None) + if event == "start" and class_namespace is not None and level == 2: + class_name, uuid = self._extract_classname_uuid(elem, class_namespace, namespace_rdf) + if uuid is not None: + self._process_element(class_name, uuid, elem) + # Check which package is read + elif event == "end": + self._check_metadata(elem) + + @staticmethod + def _extract_classname_uuid(elem, class_namespace: str, namespace_rdf: str) -> tuple: + """Extracts class name and instance uuid ("mRID") + + Args: + elem (etree.Element): description of the instance for the given profile + class_namespace (str): namespace uri defining the class + namespace_rdf (str): rdf namespace uri + + Returns: + tuple: (class_name: example "ACLineSgement", instance_uuid: "mRID") + """ + class_name = elem.tag[len(class_namespace) :] + uuid = elem.get("{%s}ID" % namespace_rdf) + if uuid is None: + uuid = elem.get("{%s}about" % namespace_rdf) + if uuid is not None: + uuid = uuid[1:] + return class_name, uuid + + def _process_element(self, class_name: str, uuid: str, elem): + """Creates or updates (if an object with the same uuid exists) + an instance of the class based on the fragment of the profile + + Args: + class_name (str): Name of the class of the instance to create/update (example: ACLineSegment) + uuid (str): mRID + elem (etree.Element): description of the instance for the given profile + """ + topology = self.import_result["topology"] + elem_str = etree.tostring(elem, encoding="utf8") + try: + # Import the module for the CGMES object. + module_name = self._get_path_to_module(class_name) + module = importlib.import_module(module_name) + + klass = getattr(module, class_name) + if uuid not in topology: + topology[uuid] = klass().from_xml(elem_str) + info_msg = "CIM object {} created".format(module_name.split(".")[-1]) + else: + obj = topology[uuid] + obj.update_from_xml(elem_str) + info_msg = "CIM object {} updated".format(module_name.split(".")[-1]) + self._log_message("info", info_msg) + + except ModuleNotFoundError: + error_msg = "Module {} not implemented".format(class_name) + self._log_message("errors", error_msg) + except Exception as e: + error_msg = "Could not create/update {}, {}".format(uuid, e) + self._log_message("errors", error_msg) + + def _check_metadata(self, elem): + if "Model.profile" in elem.tag: + for package_key in [e.value for e in Profile]: + if package_key in elem.text: + break + # the author of all imported files should be the same, avoid multiple entries + elif "author" not in self.import_result["meta_info"].keys(): + if any(author_field in elem.tag for author_field in ("Model.createdBy", "Model.modelingAuthoritySet")): + self.import_result["meta_info"]["author"] = elem.text + + # Returns a map of class_namespace to namespace for the given XML file. + @staticmethod + def _get_namespaces(source) -> Dict: + namespaces = {} + events = ("end", "start-ns", "end-ns") + for event, elem in etree.iterparse(source, events): + if event == "start-ns": + class_namespace, ns = elem + namespaces[class_namespace] = ns + elif event == "end": + break + + # Reset stream + if hasattr(source, "seek"): + source.seek(0) + + return namespaces + + # Returns the RDF Namespace from the namespaces dictionary + def _get_rdf_namespace(self) -> str: + try: + namespace = self.import_result["meta_info"]["namespaces"]["rdf"] + except KeyError: + namespace = "http://www.w3.org/1999/02/22-rdf-syntax-ns#" # NOSONAR + logger.warning("No rdf namespace found. Using %s" % namespace) + return namespace + + def _get_path_to_module(self, class_name: str) -> str: + if self.custom_folder and importlib.find_loader(self.custom_folder + "." + class_name): + path_to_module = self.custom_folder + "." + class_name + else: + path_to_module = self.cgmes_version_path + "." + class_name + return path_to_module + + def _log_message(self, log_type: Literal["errors", "info"], message: str): + self.logger_grouped[log_type].setdefault(message, 0) + self.logger_grouped[log_type][message] += 1 diff --git a/cimgen/languages/modernpython/utils/writer.py b/cimgen/languages/modernpython/utils/writer.py new file mode 100644 index 0000000..036c47e --- /dev/null +++ b/cimgen/languages/modernpython/utils/writer.py @@ -0,0 +1,104 @@ +from lxml import etree +from pydantic import BaseModel +from .constants import NAMESPACES +from .profile import BaseProfile, Profile +from typing import Dict, Optional + + +class Writer(BaseModel): + """Class for writing CIM RDF/XML files + + Args: + objects (dict): Mapping of rdfid to CIM object + Model_metadata (Optional[Dict[str, str]]): any additional data to add in header + default = {"modelingAuthoritySet": "www.sogno.energy" } + """ + + objects: Dict + writer_metadata: Dict[str, str] = {} + + def write( + self, + outputfile: str, + model_id: str, + class_profile_map: Dict[str, BaseProfile] = {}, + custom_namespaces: Dict[str, str] = {}, + ) -> dict[BaseProfile, str]: + """Write CIM RDF/XML files. + This function writes CIM objects into one or more RDF/XML files separated by profiles. + Each CIM object will be written to its corresponding profile file depending on class_profile_map. + But some objects to more than one file if some attribute profiles are not the same as the class profile. + + Args: + outputfile (str): Stem of the output file, resulting files: _.xml. + model_id (str): Stem of the model IDs, resulting IDs: _. + class_profile_map Optional[Dict[str, str]: Mapping of CIM type to profile. + custom_namespaces Optional[Dict[str, str]: {"namespace_prefix": "namespace_uri"} + + Returns: + Mapping of profile to outputfile. + """ + profile_list: list[BaseProfile] = list(Profile) + if class_profile_map: + profile_list += {p for p in class_profile_map.values() if p not in profile_list} + profile_file_map: dict[BaseProfile, str] = {} + for profile in profile_list: + profile_name = profile.long_name + full_file_name = outputfile + "_" + profile.long_name + ".xml" + output = self._generate(profile, model_id + "_" + profile_name, custom_namespaces) + if output: + output.write(full_file_name, pretty_print=True, xml_declaration=True, encoding="UTF-8") + profile_file_map[profile] = full_file_name + return profile_file_map + + def _generate( + self, profile: BaseProfile, model_id: str, custom_namespaces: Dict[str, str] = {} + ) -> Optional[etree.ElementTree]: + """Write CIM objects as RDF/XML data to a string. + + This function creates RDF/XML tree corresponding to one profile. + + Args: + profile (BaseProfile): Only data for this profile should be written. + model_id (str): Stem of the model IDs, resulting IDs: _. + custom_namespaces Optional[Dict[str, str]]: {"namespace_prefix": "namespace_uri"} + + Returns: + etree of the profile + """ + writer_info = {"modelingAuthoritySet": "www.sogno.energy"} + writer_info.update(self.writer_metadata) + fullmodel = { + "id": model_id, + "Model": writer_info, + } + for uri in profile.uris: + fullmodel["Model"].update({"profile": uri}) + + nsmap = NAMESPACES + nsmap.update(custom_namespaces) + + rdf_namespace = f"""{{{nsmap["rdf"]}}}""" + md_namespace = f"""{{{nsmap["md"]}}}""" + + root = etree.Element(rdf_namespace + "RDF", nsmap=nsmap) + + # FullModel header + model = etree.Element(md_namespace + "FullModel", nsmap=nsmap) + model.set(rdf_namespace + "about", "#" + fullmodel["id"]) + for key, value in fullmodel["Model"].items(): + element = etree.SubElement(model, md_namespace + "Model." + key) + element.text = value + root.append(model) + + count = 0 + for id, obj in self.objects.items(): + obj_etree = obj.to_xml(profile_to_export=profile, id=id) + if obj_etree is not None: + root.append(obj_etree) + count += 1 + if count > 0: + output = etree.ElementTree(root) + else: + output = None + return output