Skip to content

Commit

Permalink
Skip unsupported IANA registries in the converter
Browse files Browse the repository at this point in the history
Warn on unsupported values and skip the conversion of the related
registry

Ref. eng/recordflux/RecordFlux#1406
  • Loading branch information
andrestt committed Sep 12, 2023
1 parent 1c15a8d commit 84ea01e
Show file tree
Hide file tree
Showing 3 changed files with 269 additions and 49 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

- Reject duplicate optional arguments in `rflx` CLI (eng/recordflux/RecordFlux#1342)
- Split the `Valid_Context` into multiple functions (eng/recordflux/RecordFlux#1385)
- IANA registries with unsupported content are skipped with a warning (eng/recordflux/RecordFlux#1406)

### Removed

Expand Down
184 changes: 147 additions & 37 deletions rflx/converter/iana.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import re
import string
import textwrap
from abc import ABC, abstractmethod
from collections.abc import Sequence
from datetime import datetime
from pathlib import Path
Expand All @@ -12,10 +13,13 @@

import rflx.specification.const
from rflx.common import file_name
from rflx.error import Location, Subsystem, fail
from rflx.error import Location, RecordFluxError, Subsystem, fail, warn

NAMESPACE = {"iana": "http://www.iana.org/assignments"}
RESERVED_WORDS = "|".join(rflx.specification.const.RESERVED_WORDS)
OUTPUT_INDENT_CHAR = " "
OUTPUT_INDENT = OUTPUT_INDENT_CHAR * 3
OUTPUT_WIDTH = 80


def convert(
Expand Down Expand Up @@ -44,17 +48,35 @@ def convert(
package_name = _normalize_name(registry_id)
registry_last_updated = root.find("iana:updated", NAMESPACE)
registry_title = root.find("iana:title", NAMESPACE)

enum_types = []
for registry in root.findall(root.tag):
enum_type = _convert_registry_to_enum_type(registry, always_valid)
if enum_type is not None:
enum_types.append(enum_type)
package_entries: list[SpecificationElement] = []
for sub_registry in root.findall(root.tag):
try:
enum_type = _convert_registry_to_enum_type(sub_registry, always_valid)
if enum_type is not None:
package_entries.append(enum_type)
except RecordFluxError as e: # noqa: PERF203
sub_registry_id = sub_registry.get("id")
sub_registry_title = sub_registry.find("iana:title", NAMESPACE)
title_str = f'"{sub_registry_title.text}"' if sub_registry_title is not None else ""
id_str = f' (id={sub_registry_id if sub_registry_id is not None else "<Unknown>"})'
err_msg = f"{e}"[len("converter: error: ") :]
warn(
f"registry {title_str}{id_str} skipped due to conversion error: {err_msg}",
subsystem=Subsystem.CONVERTER,
)
package_entries.append(
CommentBlock(
f"type {_normalize_name(title_str)}\n"
"... Skipped due to unsupported content in the registry ...",
trailing_lines=1,
),
)
enum_types = [x for x in package_entries if isinstance(x, EnumType)]
resolve_duplicate_literals(enum_types)
file = output_dir / Path(file_name(package_name) + ".rflx")
write_rflx_specification(
file,
enum_types,
package_entries,
package_name,
registry_title,
registry_last_updated,
Expand All @@ -64,29 +86,36 @@ def convert(

def write_rflx_specification(
file: Path,
enum_types: Sequence[EnumType],
package_entries: list[SpecificationElement],
package_name: str,
registry_title: Optional[Element],
registry_last_updated: Optional[Element],
reproducible: bool,
) -> None:
with file.open("w+", encoding="utf-8") as f:
f.write(
"-- AUTOMATICALLY GENERATED. DO NOT EDIT.\n"
package_header: list[CommentBlock] = []

package_header.append(
CommentBlock(
"AUTOMATICALLY GENERATED. DO NOT EDIT."
+ (
""
if reproducible
else f"-- Generation date: {datetime.now().strftime('%Y-%m-%d')}\n" # noqa: DTZ005
else f"\nGeneration date: {datetime.now().strftime('%Y-%m-%d')}" # noqa: DTZ005
),
),
)
if registry_title is not None:
package_header.append(CommentBlock(f"{registry_title.text}"))
if registry_last_updated is not None:
package_header.append(
CommentBlock(
f"Registry last updated on {registry_last_updated.text}",
trailing_lines=1,
),
)
if registry_title is not None:
f.write(f"-- {registry_title.text}\n")
if registry_last_updated is not None:
f.write(f"-- Registry last updated on {registry_last_updated.text}\n\n")
f.write(f"package {package_name} is\n\n")
for enum_type in enum_types:
f.write(str(enum_type))
f.write(f"end {package_name};\n")

package = Package(package_name, package_header, package_entries)
file.write_text(package.rflx_str(), encoding="utf-8")


def resolve_duplicate_literals(enum_types: Sequence[EnumType]) -> None:
Expand Down Expand Up @@ -179,7 +208,44 @@ def _get_name_tag(record: Element) -> Optional[str]:
return None


class EnumType:
class SpecificationElement(ABC):
"""Base class for RecordFlux specification elements."""

def __str__(self) -> str:
"""
Render the current element in RecordFlux concrete syntax.
Note: If relative indentation is needed, then the method 'rflx_str' should be used instead.
"""
return self.rflx_str()

@abstractmethod
def rflx_str(self, indent_level: int = 0) -> str:
"""Render the current element in RecordFlux concrete syntax with given base indentation."""


class Package(SpecificationElement):
def __init__(
self,
name: str,
header: Sequence[CommentBlock],
entries: Sequence[SpecificationElement],
):
self.name = name
self.header = header
self.entries = entries

def rflx_str(self, indent_level: int = 0) -> str:
base_indent = OUTPUT_INDENT * indent_level
str_repr = ""
str_repr += "".join(r.rflx_str(indent_level) for r in self.header)
str_repr += f"{base_indent}package {self.name} is\n\n"
str_repr += "".join(r.rflx_str(indent_level + 1) for r in self.entries)
str_repr += f"{base_indent}end {self.name};\n"
return str_repr


class EnumType(SpecificationElement):
def __init__(
self,
type_name: str,
Expand All @@ -192,18 +258,21 @@ def __init__(
self.type_size = type_size
self.always_valid = always_valid and len(self.enum_literals) < 2**self.type_size

def __str__(self) -> str:
formatted_enum_literals = ",\n".join(str(r) for r in self.enum_literals)
def rflx_str(self, indent_level: int = 0) -> str:
base_indent = OUTPUT_INDENT * indent_level
formatted_enum_literals = ",\n".join(
r.rflx_str(indent_level + 1) for r in self.enum_literals
)
return (
f"{'':<3}type {self.type_name} is\n"
f"{'':<6}({formatted_enum_literals.lstrip()})\n"
f"{'':<3}with Size => {self.type_size}"
f"{base_indent}type {self.type_name} is\n"
f"{base_indent + OUTPUT_INDENT}({formatted_enum_literals.lstrip()})\n"
f"{base_indent}with Size => {self.type_size}"
f"{', Always_Valid;' if self.always_valid else ';'}"
f"\n\n"
)


class EnumLiteral:
class EnumLiteral(SpecificationElement):
def __init__(
self,
rflx_name: str,
Expand All @@ -222,8 +291,8 @@ def join(self, duplicate: EnumLiteral, registry_name: str) -> None:
self.comment_list.extend(duplicate.comment_list)
self.alternative_names.append(f"alternative_name = {duplicate.name}")

@property
def comment(self) -> list[str]:
def comment(self, indent_level: int) -> list[str]:
base_indent = OUTPUT_INDENT * indent_level
comments = [
f"{c.tag[c.tag.index('}') + 1:]} = {c.text}"
if c.tag is not None and c.text is not None
Expand All @@ -234,31 +303,65 @@ def comment(self) -> list[str]:
comments.extend(self.alternative_names)

formatted_comments = [
textwrap.wrap(comment, width=80, subsequent_indent=f"{'':<7}-- ")
textwrap.wrap(comment, width=OUTPUT_WIDTH, subsequent_indent=f"{base_indent} -- ")
for comment in comments
]
if len(formatted_comments) > 0:
return ["\n".join(formatted_comment) for formatted_comment in formatted_comments]
return []

def __str__(self) -> str:
def rflx_str(self, indent_level: int = 0) -> str:
base_indent = OUTPUT_INDENT * indent_level
str_repr = ""
comment = self.comment
comment = self.comment(indent_level)
if comment:
str_repr += "\n"
for comment_line in comment:
str_repr += f"{'':<7}-- {comment_line}\n"
str_repr += f"{base_indent} -- {comment_line}\n"
name = (
self.name
if len(self.name) <= 100
else self.name[:100]
+ ("_" if self.name[99] != "_" else "")
+ self.value.replace("16#", "").replace("#", "")
)
str_repr += f"{'':<7}{f'{name} => {self.value}'}"
str_repr += f"{base_indent} {f'{name} => {self.value}'}"
return str_repr


class CommentBlock(SpecificationElement):
"""
A textual comment block in the generated source code.
An instance of this class holds one text string that can contain multiple lines. Comment
characters (i.e., '--' for RecordFlux) and leading whitespace should *not* be included. When the
object is converted to string leading whitespace and comment characters are added automatically.
Lines that would be longer than 80 characters are wrapped. However, existing line-endings are
preserved.
"""

def __init__(
self,
text: str,
trailing_lines: int = 0,
) -> None:
self.text = text
self.trailing_lines = trailing_lines

def rflx_str(self, indent_level: int = 0) -> str:
def wrap_line(line: str) -> str:
commented_section = textwrap.wrap(
f"{OUTPUT_INDENT*indent_level}-- {line}",
width=OUTPUT_WIDTH,
subsequent_indent=f"{OUTPUT_INDENT*indent_level}-- ",
)
return "\n".join(commented_section)

lines = self.text.split("\n")
wrapped_lines = map(wrap_line, lines)
return "\n".join(wrapped_lines) + "\n" + "\n" * self.trailing_lines


def _normalize_name(description_text: str) -> str:
t: dict[str, Union[int, str, None]] = {c: " " for c in string.punctuation + "\n"}
name = description_text.translate(str.maketrans(t))
Expand All @@ -277,8 +380,8 @@ def _normalize_value(value: str) -> tuple[str, int]:
return rflx_hex, len(rflx_hex[3 : len(rflx_hex) - 1].lstrip("0")) * 4
if value.find("-") != -1:
range_upper = value[value.index("-") + 1 :]
return range_upper, int(range_upper).bit_length()
return str(value), int(value).bit_length()
return range_upper, _convert_int_value(range_upper).bit_length()
return str(value), _convert_int_value(value).bit_length()


def _normalize_hex_value(hex_value: str) -> str:
Expand All @@ -299,3 +402,10 @@ def _normalize_hex_value(hex_value: str) -> str:
if re.match(r"^0x[0-9A-Fa-f]+$", hex_value) is not None: # 0xA1A1
return f"16#{hex_value[2:]}#".upper()
fail(f'cannot normalize hex value "{hex_value}"', subsystem=Subsystem.CONVERTER)


def _convert_int_value(value: str) -> int:
try:
return int(value)
except ValueError as e:
fail(f"{e}", subsystem=Subsystem.CONVERTER)
Loading

0 comments on commit 84ea01e

Please sign in to comment.