Skip to content

Commit

Permalink
Merge branch 'main' into chore/prepare_release_v3.8.0
Browse files Browse the repository at this point in the history
  • Loading branch information
Bodong-Yang authored Sep 3, 2024
2 parents 4fed54f + 855ec23 commit d3338b8
Show file tree
Hide file tree
Showing 15 changed files with 2,442 additions and 469 deletions.
2 changes: 1 addition & 1 deletion src/otaclient/boot_control/_common.py
Original file line number Diff line number Diff line change
Expand Up @@ -503,7 +503,7 @@ def _load_status_file(self):
_loaded_ota_status = None

# initialize ota_status files if not presented/incompleted/invalid
if not _loaded_ota_status:
if _loaded_ota_status is None:
logger.info(
"ota_status files incompleted/not presented, "
f"initializing and set/store status to {api_types.StatusOta.INITIALIZED.name}..."
Expand Down
112 changes: 78 additions & 34 deletions src/otaclient/boot_control/_firmware_package.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,43 +42,49 @@

from __future__ import annotations

import logging
import re
from enum import Enum
from typing import Any, List, Literal
from pathlib import Path
from typing import Any, List, Literal, Union

from pydantic import BaseModel, BeforeValidator, GetCoreSchemaHandler
from pydantic_core import CoreSchema, core_schema
import yaml
from pydantic import BaseModel, BeforeValidator
from typing_extensions import Annotated

from otaclient_common.typing import gen_strenum_validator
from otaclient_common.typing import StrOrPath, gen_strenum_validator

logger = logging.getLogger(__name__)


class PayloadType(str, Enum):
UEFI_CAPSULE = "UEFI-CAPSULE"
UEFI_BOOT_APP = "UEFI-BOOT-APP"
BUP = "BUP"


DIGEST_PATTERN = re.compile(r"^(?P<algorithm>[\w\-+ ]+):(?P<digest>[a-zA-Z0-9]+)$")


def _pydantic_str_schema(
cls, source_type: Any, handler: GetCoreSchemaHandler
) -> CoreSchema:
"""Pydantic schema adapter for str."""
return core_schema.no_info_after_validator_function(cls, handler(str))


class DigestValue(str):
class DigestValue(BaseModel):
"""Implementation of digest value schema <algorithm>:<hexstr>."""

def __init__(self, _in: str) -> None:
_in = _in.strip()
if not (ma := DIGEST_PATTERN.match(_in)):
raise ValueError(f"invalid digest value: {_in}")
self.algorithm = ma.group("algorithm")
self.digest = ma.group("digest")
algorithm: str
digest: str

__get_pydantic_core_schema__ = classmethod(_pydantic_str_schema)
@classmethod
def parse(cls, _in: str | DigestValue | Any) -> DigestValue:
if isinstance(_in, DigestValue):
return _in

if isinstance(_in, str):
_in = _in.strip()
if not (ma := DIGEST_PATTERN.match(_in)):
raise ValueError(f"invalid digest value: {_in}")
return DigestValue(
algorithm=ma.group("algorithm"), digest=ma.group("digest")
)
raise TypeError(f"expect instance of str or {cls}, get {type(_in)}")


class NVIDIAFirmwareCompat(BaseModel):
Expand All @@ -95,13 +101,13 @@ def check_compat(self, _tnspec: str) -> bool:
)


class NVIDIAUEFIFirmwareSpec(BaseModel):
# NOTE: let the jetson-uefi module parse the bsp_version
class NVIDIAFirmwareSpec(BaseModel):
# NOTE: let the boot control module parse BSP version
bsp_version: str
firmware_compat: NVIDIAFirmwareCompat


class PayloadFileLocation(str):
class PayloadFileLocation(BaseModel):
"""Specifying the payload file location.
It supports file URL or digest value.
Expand All @@ -113,26 +119,33 @@ class PayloadFileLocation(str):
"""

location_type: Literal["blob", "file"]
location_path: str | DigestValue
location_path: Union[str, DigestValue]

def __init__(self, _in: str) -> None:
if _in.startswith("file://"):
self.location_type = "file"
self.location_path = _in.replace("file://", "", 1)
else:
self.location_type = "blob"
self.location_path = DigestValue(_in)
@classmethod
def parse(cls, _in: str | PayloadFileLocation | Any) -> PayloadFileLocation:
if isinstance(_in, PayloadFileLocation):
return _in

__get_pydantic_core_schema__ = classmethod(_pydantic_str_schema)
if isinstance(_in, str):
if _in.startswith("file://"):
location_type = "file"
location_path = _in.replace("file://", "", 1)
else:
location_type = "blob"
location_path = DigestValue.parse(_in)
return cls(location_type=location_type, location_path=location_path)
raise TypeError(f"expect instance of str of {cls}, get {type(_in)}")


class FirmwarePackage(BaseModel):
"""Metadata of a firmware update package payload."""

payload_name: str
file_location: Annotated[PayloadFileLocation, BeforeValidator(PayloadFileLocation)]
file_location: Annotated[
PayloadFileLocation, BeforeValidator(PayloadFileLocation.parse)
]
type: Annotated[PayloadType, BeforeValidator(gen_strenum_validator(PayloadType))]
digest: DigestValue
digest: Annotated[DigestValue, BeforeValidator(DigestValue.parse)]


class HardwareType(str, Enum):
Expand All @@ -156,7 +169,7 @@ class FirmwareManifest(BaseModel):
]
hardware_series: str
hardware_model: str
firmware_spec: NVIDIAUEFIFirmwareSpec
firmware_spec: NVIDIAFirmwareSpec
firmware_packages: List[FirmwarePackage]

def check_compat(self, _tnspec: str) -> bool:
Expand Down Expand Up @@ -186,3 +199,34 @@ class FirmwareUpdateRequest(BaseModel):

format_version: Literal[1] = 1
firmware_list: List[str]


def load_firmware_package(
*,
firmware_update_request_fpath: StrOrPath,
firmware_manifest_fpath: StrOrPath,
) -> tuple[FirmwareUpdateRequest, FirmwareManifest] | None:
"""Parse firmware update package."""
try:
firmware_update_request = FirmwareUpdateRequest.model_validate(
yaml.safe_load(Path(firmware_update_request_fpath).read_text())
)
except FileNotFoundError:
logger.info("firmware update request file not found, skip firmware update")
return
except Exception as e:
logger.warning(f"invalid request file: {e!r}")
return

try:
firmware_manifest = FirmwareManifest.model_validate(
yaml.safe_load(Path(firmware_manifest_fpath).read_text())
)
except FileNotFoundError:
logger.warning("no firmware manifest file presented, skip firmware update!")
return
except Exception as e:
logger.warning(f"invalid manifest file: {e!r}")
return

return firmware_update_request, firmware_manifest
Loading

0 comments on commit d3338b8

Please sign in to comment.