Skip to content

Commit

Permalink
Refactor functions to_xml and _parse_xml_fragment to split in differe…
Browse files Browse the repository at this point in the history
…nt subfunction and improve readability

Signed-off-by: HUG0-D <[email protected]>
  • Loading branch information
HUG0-D committed Nov 20, 2024
1 parent 0923d3c commit cd03082
Showing 1 changed file with 95 additions and 87 deletions.
182 changes: 95 additions & 87 deletions cimgen/languages/modernpython/utils/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,27 +135,14 @@ def cgmes_attributes_in_profile(self, profile: BaseProfile | None) -> dict[str,
shortname = f.name
qualname = f"{parent.apparent_name()}.{shortname}" # type: ignore
infos = dict()
if f not in self.cgmes_attribute_names_in_profile(profile) or shortname in seen_attrs:
# Wrong profile or already found from a parent.
continue
else:
if f in self.cgmes_attribute_names_in_profile(profile) and shortname not in seen_attrs:
# Namespace finding
# "class namespace" means the first namespace defined in the inheritance tree.
# This can go up to Base, which will give the default cim NS.
if (extra := getattr(f.default, "json_schema_extra", None)) is None:
# The attribute does not have extra metadata. It might be a custom atttribute
# without it, or a base type (int...).
# Use the class namespace.
infos["namespace"] = self.namespace
elif extra.get("is_used"):
if (extra.get("attribute_namespace", None)) is None:
# The attribute has some extras, but not namespace.
# Use the class namespace.
infos["namespace"] = self.namespace

else:
# The attribute has an explicit namesapce
infos["namespace"] = extra.get("attribute_namespace", self.namespace)

infos["namespace"] = self.namespace
extra = getattr(f.default, "json_schema_extra", None)
if extra is not None and extra.get("is_used"):
# adding the extras, used for xml generation
extra_info = {
"attr_name": qualname,
Expand All @@ -166,6 +153,9 @@ def cgmes_attributes_in_profile(self, profile: BaseProfile | None) -> dict[str,
"is_datatype_attribute": extra.get("is_datatype_attribute"),
"attribute_class": extra.get("attribute_class"),
}
if (extra.get("attribute_namespace", None)) is not None:
# The attribute has an explicit namesapce
extra_info["namespace"] = extra.get("attribute_namespace", self.namespace)
infos.update(extra_info)

infos["value"] = getattr(self, shortname)
Expand All @@ -185,16 +175,7 @@ def to_xml(self, profile_to_export: BaseProfile, id: Optional[str] = None) -> Op
Returns:
Optional[etree.Element]: etree describing self for the profile_to_export, None if nothing to export
"""
profile_attributes = self.cgmes_attributes_in_profile(profile_to_export)
is_recommanded_profile = self.recommended_profile.value == profile_to_export.value

if not is_recommanded_profile:
# remove attributes that are also present in "recommended_profile"
attributes_main = self.cgmes_attributes_in_profile(self.recommended_profile)
for key in attributes_main.keys():
if key in profile_attributes:
del profile_attributes[key]
profile_attributes = self._remove_empty_attributes(profile_attributes)
profile_attributes = self._get_attributes_to_export(profile_to_export)

if "mRID" in self.to_dict():
obj_id = "_" + self.mRID
Expand All @@ -205,34 +186,31 @@ def to_xml(self, profile_to_export: BaseProfile, id: Optional[str] = None) -> Op
if profile_attributes == {} or obj_id is None:
root = None
else:
nsmap = NAMESPACES
# Create root element
nsmap = NAMESPACES
root = etree.Element("{" + self.namespace + "}" + self.resource_name, nsmap=nsmap)

# Add the ID ass attribute to the root
rdf_namespace = f"""{{{nsmap["rdf"]}}}"""
if is_recommanded_profile:
root.set(rdf_namespace + "ID", obj_id)
# Add the ID as attribute to the root
if self.recommended_profile.value == profile_to_export.value:
root.set(f"""{{{nsmap["rdf"]}}}""" + "ID", obj_id)
else:
root.set(rdf_namespace + "about", "#" + obj_id)

for field_name, attribute in profile_attributes.items():
# add all attributes relevant to the profile as SubElements
attr_namespace = attribute["namespace"]
element_name = f"{{{attr_namespace}}}{field_name}"

if attribute["is_class_attribute"]:
# class_attributes are exported as rdf: resource #_mRID_of_target
element = etree.SubElement(root, element_name)
element.set(rdf_namespace + "resource", "#" + attribute["value"])
elif attribute["is_enum_attribute"]:
element = etree.SubElement(root, element_name)
element.set(rdf_namespace + "resource", nsmap["cim"] + attribute["value"])
else:
element = etree.SubElement(root, element_name)
element.text = str(attribute["value"])
root.set(f"""{{{nsmap["rdf"]}}}""" + "about", "#" + obj_id)

root = self._add_attribute_to_etree(attributes=profile_attributes, root=root, nsmap=nsmap)
return root

def _get_attributes_to_export(self, profile_to_export: BaseProfile) -> dict:
attributes_to_export = self.cgmes_attributes_in_profile(profile_to_export)
is_recommanded_profile = self.recommended_profile.value == profile_to_export.value
if not is_recommanded_profile:
# remove attributes that are also present in "recommended_profile"
attributes_main = self.cgmes_attributes_in_profile(self.recommended_profile)
for key in attributes_main.keys():
if key in attributes_to_export:
del attributes_to_export[key]
attributes_to_export = self._remove_empty_attributes(attributes_to_export)
return attributes_to_export

@staticmethod
def _remove_empty_attributes(attributes: dict) -> dict:
for key, attribute in list(attributes.items()):
Expand All @@ -249,6 +227,26 @@ def _remove_empty_attributes(attributes: dict) -> dict:
attribute["value"] = str(attribute["value"]).lower()
return attributes

@staticmethod
def _add_attribute_to_etree(attributes: dict, root: etree.Element, nsmap: dict) -> etree.Element:
rdf_namespace = f"""{{{nsmap["rdf"]}}}"""
for field_name, attribute in attributes.items():
# add all attributes relevant to the profile as SubElements
attr_namespace = attribute["namespace"]
element_name = f"{{{attr_namespace}}}{field_name}"

if attribute["is_class_attribute"]:
# class_attributes are exported as rdf: resource #_mRID_of_target
element = etree.SubElement(root, element_name)
element.set(rdf_namespace + "resource", "#" + attribute["value"])
elif attribute["is_enum_attribute"]:
element = etree.SubElement(root, element_name)
element.set(rdf_namespace + "resource", nsmap["cim"] + attribute["value"])
else:
element = etree.SubElement(root, element_name)
element.text = str(attribute["value"])
return root

def __str__(self) -> str:
"""Returns the string representation of this resource."""
return "\n".join([f"{k}={v}" for k, v in sorted(self.to_dict().items())])
Expand All @@ -269,15 +267,7 @@ def _parse_xml_fragment(self, xml_fragment: str) -> dict:
if not xml_tree.tag.endswith(self.resource_name):
raise (KeyError(f"The fragment does not correspond to the class {self.resource_name}"))

# parsing the mRID
for key, value in xml_tree.attrib.items():
if key.endswith("ID") or key.endswith("about"):
if value.startswith("#"):
value = value[1:]
if value.startswith("_"):
value = value[1:]
if hasattr(self, "mRID") and value is not None:
attribute_dict["mRID"] = value
attribute_dict.update(self._extract_mRID_from_etree(xml_tree=xml_tree))

# parsing attributes defined in class
class_attributes = self.cgmes_attributes_in_profile(None)
Expand All @@ -287,39 +277,57 @@ def _parse_xml_fragment(self, xml_fragment: str) -> dict:
continue
xml_attribute = xml_attribute[0]
attr = key.rsplit(".")[-1]
attr_value = None

# class attributes are pointing to another class/instance defined in .attrib
if class_attribute["is_class_attribute"]:
if len(xml_attribute.keys()) == 1:
attr_value = xml_attribute.values()[0]
if attr_value.startswith("#"):
attr_value = attr_value[1:]

# enum attributes are defined in .attrib and has a prefix ending in "#"
elif class_attribute["is_enum_attribute"]:
if len(xml_attribute.keys()) == 1:
attr_value = xml_attribute.values()[0]
if "#" in attr_value:
attr_value = attr_value.rsplit("#")[-1]

elif class_attribute["is_list_attribute"]:
# other attributes types are defined in .text
attr_value = xml_attribute
attr_value = self.key.append(attr_value)
else:
attr_value = xml_attribute.text
# primitive classes are described in "cim_data_type" allowing to retrieve the data type
if class_attribute["is_primitive_attribute"] or class_attribute["is_datatype_attribute"]:
if class_attribute["attribute_class"].type == bool:
attr_value = {"true": True, "false": False}.get(attr_value, None)
else:
attr_value = class_attribute["attribute_class"].type(attr_value)

attr_value = self._extract_attr_value_from_etree(class_attribute, xml_attribute)
if hasattr(self, attr) and attr_value is not None:
attribute_dict[attr] = attr_value

return attribute_dict

def _extract_mRID_from_etree(self, xml_tree: etree.Element) -> dict:
"""Parsing the mRID from etree attributes"""
mRID_dict = {}
for key, value in xml_tree.attrib.items():
if key.endswith("ID") or key.endswith("about"):
if value.startswith("#"):
value = value[1:]
if value.startswith("_"):
value = value[1:]
if hasattr(self, "mRID") and value is not None:
mRID_dict = {"mRID": value}
return mRID_dict

def _extract_attr_value_from_etree(self, class_attribute: dict, xml_attribute: dict):
"""Parsing the attribute value from etree attributes"""
attr_value = None
# class attributes are pointing to another class/instance defined in .attrib
if class_attribute["is_class_attribute"]:
if len(xml_attribute.keys()) == 1:
attr_value = xml_attribute.values()[0]
if attr_value.startswith("#"):
attr_value = attr_value[1:]

# enum attributes are defined in .attrib and has a prefix ending in "#"
elif class_attribute["is_enum_attribute"]:
if len(xml_attribute.keys()) == 1:
attr_value = xml_attribute.values()[0]
if "#" in attr_value:
attr_value = attr_value.rsplit("#")[-1]

elif class_attribute["is_list_attribute"]:
# other attributes types are defined in .text
attr_value = xml_attribute
attr_value = self.key.append(attr_value)
else:
attr_value = xml_attribute.text
# primitive classes are described in "cim_data_type" allowing to retrieve the data type
if class_attribute["is_primitive_attribute"] or class_attribute["is_datatype_attribute"]:
if class_attribute["attribute_class"].type == bool:
attr_value = {"true": True, "false": False}.get(attr_value, None)
else:
attr_value = class_attribute["attribute_class"].type(attr_value)
return attr_value

def update_from_xml(self, xml_fragment: str):
"""
Updates the instance by parsing an xml fragment defining the attributes of this instance
Expand Down

0 comments on commit cd03082

Please sign in to comment.