diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 16f7340..8d27bcf 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -20,10 +20,10 @@ jobs: python-version: ${{ matrix.python-version }} - name: Lint with flake8 run: | - pip install cbor2 lxml flake8 + pip install cbor2 lxml pefile flake8 # stop the build if there are Python syntax errors or undefined names flake8 . --count --select=E9,F63,F7,F82 --show-source --statistics - name: Test with pytest run: | - pip install cbor2 lxml pytest + pip install cbor2 lxml pefile pytest pytest diff --git a/setup.py b/setup.py index 749f13c..51c4b1c 100755 --- a/setup.py +++ b/setup.py @@ -59,11 +59,14 @@ "format_spdx.pyi", "format_swid.pyi", "format_uswid.pyi", + "format_pe.pyi", "hash.pyi", "payload.pyi", "purl.pyi", "evidence.pyi", "component.pyi", + "container.pyi", + "container_utils.pyi", "link.pyi", "problem.pyi", "vcs.pyi", diff --git a/uswid/__init__.py b/uswid/__init__.py index aa94f3a..5c71869 100644 --- a/uswid/__init__.py +++ b/uswid/__init__.py @@ -28,6 +28,7 @@ from uswid.format_uswid import uSwidFormatUswid from uswid.format_cyclonedx import uSwidFormatCycloneDX from uswid.format_spdx import uSwidFormatSpdx +from uswid.format_pe import uSwidFormatPe from uswid.vex_document import uSwidVexDocument from uswid.vex_product import uSwidVexProduct from uswid.vex_statement import ( diff --git a/uswid/cli.py b/uswid/cli.py index 93fd424..4bc1abc 100755 --- a/uswid/cli.py +++ b/uswid/cli.py @@ -7,28 +7,18 @@ # # pylint: disable=wrong-import-position,too-many-locals,too-many-statements,too-many-nested-blocks -from enum import IntEnum from collections import defaultdict -from random import choices, randrange from datetime import datetime -from typing import Optional, Any, List, Dict +from typing import Optional, List, Dict, Callable import argparse -import tempfile -import subprocess import socket import json - import os import sys -import shutil -import uuid -import string from importlib import metadata as importlib_metadata from importlib.metadata import PackageNotFoundError -import pefile - sys.path.append(os.path.realpath(".")) from uswid import ( @@ -53,192 +43,32 @@ from uswid.format_uswid import uSwidFormatUswid from uswid.format_cyclonedx import uSwidFormatCycloneDX from uswid.format_spdx import uSwidFormatSpdx +from uswid.format_pe import uSwidFormatPe from uswid.vcs import uSwidVcs from uswid.vex_document import uSwidVexDocument +from uswid.container_utils import container_generate, container_roundtrip -def _adjust_SectionSize(sz, align): - if sz % align: - sz = ((sz + align) // align) * align - return sz - - -def _pe_get_section_by_name(pe: pefile.PE, name: str) -> pefile.SectionStructure: - for sect in pe.sections: - if sect.Name == name.encode().ljust(8, b"\0"): - return sect - return None - - -def _load_efi_pefile(filepath: str) -> uSwidContainer: - """read EFI file using pefile""" - pe = pefile.PE(filepath) - sect = _pe_get_section_by_name(pe, ".sbom") - if not sect: - raise NotSupportedError("PE files have to have an linker-defined .sbom section") - component = uSwidFormatCoswid().load(sect.get_data()) - component.add_source_filename(filepath) - return component - - -def _load_efi_objcopy(filepath: str, objcopy: str) -> uSwidContainer: - """read EFI file using objcopy""" - objcopy_full = shutil.which(objcopy) - if not objcopy_full: - print(f"executable {objcopy} not found") - sys.exit(1) - with tempfile.NamedTemporaryFile( - mode="w+b", prefix="objcopy_", suffix=".bin", delete=True - ) as dst: - try: - # pylint: disable=unexpected-keyword-arg - subprocess.check_output( - [ - objcopy_full, - "-O", - "binary", - "--only-section=.sbom", - filepath, - dst.name, - ], - stderr=subprocess.PIPE, - ) - except subprocess.CalledProcessError as e: - print(e) - sys.exit(1) - component = uSwidFormatCoswid().load(dst.read()) - component.add_source_filename(filepath) - return component - - -def _save_efi_pefile(component: uSwidComponent, filepath: str) -> None: - """modify EFI file using pefile""" - - blob = uSwidFormatCoswid().save(uSwidContainer([component])) - pe = pefile.PE(filepath) - sect = _pe_get_section_by_name(pe, ".sbom") - if not sect: - raise NotSupportedError("PE files have to have an linker-defined .sbom section") - - # can we squeeze the new uSWID blob into the existing space - sect.Misc = len(blob) - if len(blob) <= sect.SizeOfRawData: - pe.set_bytes_at_offset(sect.PointerToRawData, blob) - - # save - pe.write(filepath) - - -def _save_efi_objcopy( - component: uSwidComponent, - filepath: str, - cc: Optional[str], - cflags: str, - objcopy: str, -) -> None: - """modify EFI file using objcopy""" - objcopy_full = shutil.which(objcopy) - if not objcopy_full: - print(f"executable {objcopy} not found") - sys.exit(1) - if not os.path.exists(filepath): - if not cc: - raise NotSupportedError("compiler is required for missing section") - subprocess.run( - [cc, "-x", "c", "-c", "-o", filepath, "/dev/null"] + cflags.split(" "), - check=True, - ) - - # save to file? - try: - blob = uSwidFormatIni().save(uSwidContainer([component])) - except NotSupportedError as e: - print(e) - sys.exit(1) - - with tempfile.NamedTemporaryFile( - mode="wb", prefix="objcopy_", suffix=".bin", delete=True - ) as src: - src.write(blob) - src.flush() - try: - # pylint: disable=unexpected-keyword-arg - subprocess.check_output( - [ - objcopy_full, - "--remove-section=.sbom", - "--add-section", - f".sbom={src.name}", - "--set-section-flags", - ".sbom=contents,alloc,load,readonly,data", - filepath, - ] - ) - except subprocess.CalledProcessError as e: - print(e) - sys.exit(1) - - -class SwidFormat(IntEnum): - """Detected file format""" - - UNKNOWN = 0 - INI = 1 - XML = 2 - USWID = 3 - PE = 4 - JSON = 5 - PKG_CONFIG = 6 - COSWID = 7 - CYCLONE_DX = 8 - SPDX = 9 - VEX = 10 - - -def _detect_format(filepath: str) -> SwidFormat: +def _detect_format(filepath: str) -> Optional[Callable]: if filepath.endswith("bom.json") or filepath.endswith("cdx.json"): - return SwidFormat.CYCLONE_DX + return uSwidFormatCycloneDX() if filepath.endswith("spdx.json"): - return SwidFormat.SPDX + return uSwidFormatSpdx() ext = filepath.rsplit(".", maxsplit=1)[-1].lower() if ext in ["exe", "efi", "o"]: - return SwidFormat.PE + return uSwidFormatPe() if ext in ["uswid", "raw", "bin"]: - return SwidFormat.USWID + return uSwidFormatUswid() if ext in ["coswid", "cbor"]: - return SwidFormat.COSWID + return uSwidFormatCoswid() if ext == "ini": - return SwidFormat.INI + return uSwidFormatIni() if ext == "xml": - return SwidFormat.XML + return uSwidFormatSwid() if ext == "json": - return SwidFormat.JSON - if ext == "pc": - return SwidFormat.PKG_CONFIG - if ext == "vex": - return SwidFormat.VEX - return SwidFormat.UNKNOWN - - -def _type_for_fmt( - fmt: SwidFormat, args: Any, filepath: Optional[str] = None -) -> Optional[Any]: - if fmt == SwidFormat.INI: - return uSwidFormatIni() - if fmt == SwidFormat.COSWID: - return uSwidFormatCoswid() - if fmt == SwidFormat.JSON: return uSwidFormatGoswid() - if fmt == SwidFormat.XML: - return uSwidFormatSwid() - if fmt == SwidFormat.CYCLONE_DX: - return uSwidFormatCycloneDX() - if fmt == SwidFormat.SPDX: - return uSwidFormatSpdx() - if fmt == SwidFormat.PKG_CONFIG: - return uSwidFormatPkgconfig(filepath=filepath) - if fmt == SwidFormat.USWID: - return uSwidFormatUswid(compression=args.compression) # type: ignore + if ext == "pc": + return uSwidFormatPkgconfig() return None @@ -352,202 +182,6 @@ def _container_merge_from_filepath( ) -def _roundtrip(container: uSwidContainer, verbose: bool = False) -> None: - - # collect for analysis - try: - component: uSwidComponent = container[0] - except IndexError: - print("no default component") - return - - # convert to each format and back again - for base in [ - uSwidFormatCoswid(), - uSwidFormatIni(), - uSwidFormatCycloneDX(), - uSwidFormatGoswid(), - uSwidFormatPkgconfig(), - uSwidFormatSpdx(), - uSwidFormatSwid(), - uSwidFormatUswid(), - ]: - - # proxy - base.verbose = verbose - - # save - try: - blob: bytes = base.save(container) - except NotImplementedError: - continue - - # load - try: - container_new = base.load(blob) - except NotImplementedError: - continue - try: - component_new = container_new[0] - except IndexError: - print(f"no default component for {base.name}") - continue - - # compare the old and the new - differences: List[Dict[str, Any]] = [] - for key in [ - "tag_id", - "tag_version", - "type", - "software_name", - "software_version", - "version_scheme", - "summary", - "product", - "colloquial_version", - "revision", - "edition", - "persistent_id", - "activation_status", - "cpe", - ]: - if getattr(component, key) != getattr(component_new, key): - differences.append( - { - "class": "uSwidComponent", - "property": key, - "old": getattr(component, key), - "new": getattr(component_new, key), - } - ) - - # payloads - for payload in component.payloads: - - # check still exists - payload_new = component_new.get_payload_by_name(payload.name) - if not payload_new: - differences.append( - { - "class": "uSwidPayload", - "name": payload.name, - } - ) - continue - - # check values - for key in [ - "name", - "size", - ]: - if getattr(payload, key) != getattr(payload_new, key): - differences.append( - { - "class": "uSwidPayload", - "property": key, - "old": getattr(payload, key), - "new": getattr(payload_new, key), - } - ) - - # entities - for entity in component.entities: - - # check still exists - for role in entity.roles: - entity_new = component_new.get_entity_by_role(role) - if not entity_new: - differences.append( - { - "class": "uSwidEntity", - "name": role, - } - ) - continue - - # check values - for key in [ - "name", - "regid", - ]: - if getattr(entity, key) != getattr(entity_new, key): - differences.append( - { - "class": "uSwidEntity", - "property": key, - "old": getattr(entity, key), - "new": getattr(entity_new, key), - } - ) - - # link - for link in component.links: - # check still exists - link_new = component_new.get_link_by_rel(link.rel) - if not link_new: - differences.append( - { - "class": "uSwidLink", - "name": str(link.rel), - } - ) - continue - - # check values - for key in [ - "href", - "rel", - ]: - if getattr(link, key) != getattr(link_new, key): - differences.append( - { - "class": "uSwidLink", - "property": key, - "old": getattr(link, key), - "new": getattr(link_new, key), - } - ) - - # evidence - for evidence in component.evidences: - # check still exists - evidence_new = component_new.get_evidence_by_rel(evidence.rel) - if not evidence_new: - differences.append( - { - "class": "uSwidEvidence", - "name": evidence.rel, - } - ) - continue - - # check values - for key in [ - "date", - "device_id", - ]: - if getattr(evidence, key) != getattr(evidence_new, key): - differences.append( - { - "class": "uSwidEvidence", - "property": key, - "old": getattr(evidence, key), - "new": getattr(evidence_new, key), - } - ) - - # show differences - total: float = 22 - print(f"{base.name}: { 100.0 / float(total) * (total - len(differences)):.0f}%") - for dif in differences: - try: - print( - f" - FAILURE {dif['class']}.{dif['property']}: {dif['old']}->{dif['new']}" - ) - except KeyError: - print(f" - FAILURE {dif['class']} [{dif['name']}] -> None") - - def main(): """Main entrypoint""" parser = argparse.ArgumentParser( @@ -659,13 +293,12 @@ def main(): for path in args.fallback_path: for basename in os.listdir(path): filepath = os.path.join(path, basename) - fmt = _detect_format(filepath) - if fmt == SwidFormat.UNKNOWN: - continue - base = _type_for_fmt(fmt, args, filepath=filepath) + base = _detect_format(filepath) if not base: continue base.verbose = args.verbose + if isinstance(base, uSwidFormatPkgconfig): + base.filepath = filepath with open(filepath, "rb") as f: blob: bytes = f.read() for component in base.load(blob, path=filepath): @@ -708,13 +341,9 @@ def main(): component = container_fallback.get_by_link_href(remote_path) if component: filepath = component.source_filenames[0] - fmt = _detect_format(filepath) - if fmt == SwidFormat.UNKNOWN: - continue - base = _type_for_fmt(fmt, args, filepath=filepath) + base = _detect_format(filepath) if not base: - print(f"{fmt} no type for format") - sys.exit(1) + continue base.verbose = args.verbose _container_merge_from_filepath( container, base, filepath, dirpath=dirpath, fixup=args.fixup @@ -747,84 +376,39 @@ def main(): print("Use uswid --help for command line arguments") sys.exit(1) - # generate 1000 plausible components, each with: - # - unique tag-id GUID - # - unique software-name of size 4-30 chars - # - colloquial-version from a random selection of 10 SHA-1 hashes - # - edition from a random SHA-1 hash - # - semantic version of size 3-8 chars - # - entity from a random selection of 10 entities + # generate plausible components if args.generate: - tree_hashes: List[str] = [] - entities: List[uSwidEntity] = [] - for _ in range(10): - tree_hashes.append("".join(choices("0123456789abcdef", k=40))) - for i in range(10): - entity = uSwidEntity() - entity.name = "Entity#" + str(i) - entity.regid = "com.entity" + str(i) - entity.roles = [uSwidEntityRole.TAG_CREATOR] - entities.append(entity) - for i in range(1000): - component = uSwidComponent() - component.tag_id = str(uuid.uuid4()) - component.software_name = "".join( - choices(string.ascii_lowercase, k=randrange(4, 30)) - ) - component.software_version = "1." + "".join( - choices("123456789", k=randrange(1, 6)) - ) - component.colloquial_version = tree_hashes[randrange(len(tree_hashes))] - component.edition = "".join(choices("0123456789abcdef", k=40)) - component.version_scheme = uSwidVersionScheme.MULTIPARTNUMERIC - component.add_entity(entities[randrange(len(entities))]) - container.append(component) + container_generate(container) # collect data here for filepath in load_filepaths: try: - fmt = _detect_format(filepath) - if fmt == SwidFormat.UNKNOWN: - print(f"{filepath} has unknown extension, using uSWID") - fmt = SwidFormat.USWID - if fmt == SwidFormat.PE: - if args.objcopy: - container_tmp = _load_efi_objcopy(filepath, objcopy=args.objcopy) - else: - container_tmp = _load_efi_pefile(filepath) - for component in container_tmp: - component_new = container.merge(component) - if component_new: - print( - "{} was merged into existing component {}".format( - filepath, component_new.tag_id - ) - ) - elif fmt == SwidFormat.VEX: + if filepath.endswith(".vex"): with open(filepath, "rb") as f: container.add_vex_document( uSwidVexDocument(json.loads(f.read().decode())) ) - elif fmt in [ - SwidFormat.INI, - SwidFormat.JSON, - SwidFormat.COSWID, - SwidFormat.USWID, - SwidFormat.XML, - SwidFormat.SPDX, - SwidFormat.CYCLONE_DX, - SwidFormat.PKG_CONFIG, - ]: - base = _type_for_fmt(fmt, args, filepath=filepath) - base.verbose = args.verbose - if not base: - print(f"{fmt} no type for format") - sys.exit(1) - _container_merge_from_filepath( - container, base, filepath, fixup=args.fixup - ) else: - print(f"{filepath} has unknown format, ignoring") + base = _detect_format(filepath) + if not base: + print(f"{filepath} has unknown extension, using uSWID") + base = uSwidFormatUswid() + base.verbose = args.verbose + if isinstance(base, uSwidFormatPe): + base.objcopy = args.objcopy + with open(filepath, "rb") as f: + for component in base.load(f.read(), filepath): + component_new = container.merge(component) + if component_new: + print( + "{} was merged into existing component {}".format( + filepath, component_new.tag_id + ) + ) + else: + _container_merge_from_filepath( + container, base, filepath, fixup=args.fixup + ) except FileNotFoundError: print(f"{filepath} does not exist") sys.exit(1) @@ -893,7 +477,7 @@ def main(): # test the container with different SBOM formats if args.roundtrip: - _roundtrip(container, verbose=args.verbose) + container_roundtrip(container, verbose=args.verbose) # add any missing evidence for component in container: @@ -915,38 +499,22 @@ def main(): print(f"{component}") for filepath in save_filepaths: try: - fmt = _detect_format(filepath) - if fmt == SwidFormat.PE: - component_pe: Optional[uSwidComponent] = container.get_default() - if not component_pe: - print("cannot save PE when no default component") - sys.exit(1) - if args.objcopy: - _save_efi_objcopy( - component_pe, filepath, args.cc, args.cflags, args.objcopy - ) - else: - _save_efi_pefile(component_pe, filepath) - elif fmt in [ - SwidFormat.INI, - SwidFormat.COSWID, - SwidFormat.JSON, - SwidFormat.XML, - SwidFormat.USWID, - SwidFormat.CYCLONE_DX, - SwidFormat.SPDX, - ]: - base = _type_for_fmt(fmt, args) - base.verbose = args.verbose - if not base: - print(f"{fmt} no type for format") - sys.exit(1) - blob = base.save(container) - with open(filepath, "wb") as f: - f.write(blob) - else: + base = _detect_format(filepath) + if not base: print(f"{filepath} extension is not supported") sys.exit(1) + base.verbose = args.verbose + if isinstance(base, uSwidFormatUswid): + base.compression = args.compression + if isinstance(base, uSwidFormatPe): + base.filepath = filepath + base.objcopy = args.objcopy + base.cc = args.cc + base.cflags = args.cflags + blob = base.save(container) + if blob: + with open(filepath, "wb") as f: + f.write(blob) except NotSupportedError as e: print(e) sys.exit(1) diff --git a/uswid/container_utils.py b/uswid/container_utils.py new file mode 100644 index 0000000..962b588 --- /dev/null +++ b/uswid/container_utils.py @@ -0,0 +1,258 @@ +#!/usr/bin/python3 +# -*- coding: utf-8 -*- +# +# Copyright (C) 2022 Richard Hughes +# +# SPDX-License-Identifier: BSD-2-Clause-Patent +# +# pylint: disable=too-few-public-methods,protected-access + +from typing import List, Dict, Any +from random import choices, randrange +import uuid +import string + +from .container import uSwidContainer +from .component import uSwidComponent +from .entity import uSwidEntity, uSwidEntityRole +from .enums import uSwidVersionScheme + +from .format_coswid import uSwidFormatCoswid +from .format_ini import uSwidFormatIni +from .format_goswid import uSwidFormatGoswid +from .format_pkgconfig import uSwidFormatPkgconfig +from .format_swid import uSwidFormatSwid +from .format_uswid import uSwidFormatUswid +from .format_cyclonedx import uSwidFormatCycloneDX +from .format_spdx import uSwidFormatSpdx + + +def container_generate(container: uSwidContainer) -> None: + """generate 1000 plausible components, each with: + - unique tag-id GUID + - unique software-name of size 4-30 chars + - colloquial-version from a random selection of 10 SHA-1 hashes + - edition from a random SHA-1 hash + - semantic version of size 3-8 chars + - entity from a random selection of 10 entities + """ + tree_hashes: List[str] = [] + entities: List[uSwidEntity] = [] + for _ in range(10): + tree_hashes.append("".join(choices("0123456789abcdef", k=40))) + for i in range(10): + entity = uSwidEntity() + entity.name = "Entity#" + str(i) + entity.regid = "com.entity" + str(i) + entity.roles = [uSwidEntityRole.TAG_CREATOR] + entities.append(entity) + for i in range(1000): + component = uSwidComponent() + component.tag_id = str(uuid.uuid4()) + component.software_name = "".join( + choices(string.ascii_lowercase, k=randrange(4, 30)) + ) + component.software_version = "1." + "".join( + choices("123456789", k=randrange(1, 6)) + ) + component.colloquial_version = tree_hashes[randrange(len(tree_hashes))] + component.edition = "".join(choices("0123456789abcdef", k=40)) + component.version_scheme = uSwidVersionScheme.MULTIPARTNUMERIC + component.add_entity(entities[randrange(len(entities))]) + container.append(component) + + +def container_roundtrip(container: uSwidContainer, verbose: bool = False) -> None: + """rountrip the container into a few different SBOM formats""" + + try: + component: uSwidComponent = container[0] # type:ignore[index] + except IndexError: + print("no default component") + return + + # convert to each format and back again + for base in [ + uSwidFormatCoswid(), + uSwidFormatIni(), + uSwidFormatCycloneDX(), + uSwidFormatGoswid(), + uSwidFormatPkgconfig(), + uSwidFormatSpdx(), + uSwidFormatSwid(), + uSwidFormatUswid(), + ]: + + # proxy + base.verbose = verbose + + # save + try: + blob: bytes = base.save(container) + except NotImplementedError: + continue + + # load + try: + container_new = base.load(blob) + except NotImplementedError: + continue + try: + component_new = container_new[0] # type:ignore[index] + except IndexError: + print(f"no default component for {base.name}") + continue + + # compare the old and the new + differences: List[Dict[str, Any]] = [] + for key in [ + "tag_id", + "tag_version", + "type", + "software_name", + "software_version", + "version_scheme", + "summary", + "product", + "colloquial_version", + "revision", + "edition", + "persistent_id", + "activation_status", + "cpe", + ]: + if getattr(component, key) != getattr(component_new, key): + differences.append( + { + "class": "uSwidComponent", + "property": key, + "old": getattr(component, key), + "new": getattr(component_new, key), + } + ) + + # payloads + for payload in component.payloads: + + # check still exists + payload_new = component_new.get_payload_by_name(payload.name) + if not payload_new: + differences.append( + { + "class": "uSwidPayload", + "name": payload.name, + } + ) + continue + + # check values + for key in [ + "name", + "size", + ]: + if getattr(payload, key) != getattr(payload_new, key): + differences.append( + { + "class": "uSwidPayload", + "property": key, + "old": getattr(payload, key), + "new": getattr(payload_new, key), + } + ) + + # entities + for entity in component.entities: + + # check still exists + for role in entity.roles: + entity_new = component_new.get_entity_by_role(role) + if not entity_new: + differences.append( + { + "class": "uSwidEntity", + "name": role, + } + ) + continue + + # check values + for key in [ + "name", + "regid", + ]: + if getattr(entity, key) != getattr(entity_new, key): + differences.append( + { + "class": "uSwidEntity", + "property": key, + "old": getattr(entity, key), + "new": getattr(entity_new, key), + } + ) + + # link + for link in component.links: + # check still exists + link_new = component_new.get_link_by_rel(link.rel) + if not link_new: + differences.append( + { + "class": "uSwidLink", + "name": str(link.rel), + } + ) + continue + + # check values + for key in [ + "href", + "rel", + ]: + if getattr(link, key) != getattr(link_new, key): + differences.append( + { + "class": "uSwidLink", + "property": key, + "old": getattr(link, key), + "new": getattr(link_new, key), + } + ) + + # evidence + for evidence in component.evidences: + # check still exists + evidence_new = component_new.get_evidence_by_rel(evidence.rel) + if not evidence_new: + differences.append( + { + "class": "uSwidEvidence", + "name": evidence.rel, + } + ) + continue + + # check values + for key in [ + "date", + "device_id", + ]: + if getattr(evidence, key) != getattr(evidence_new, key): + differences.append( + { + "class": "uSwidEvidence", + "property": key, + "old": getattr(evidence, key), + "new": getattr(evidence_new, key), + } + ) + + # show differences + total: float = 22 + print(f"{base.name}: { 100.0 / float(total) * (total - len(differences)):.0f}%") + for dif in differences: + try: + print( + f" - FAILURE {dif['class']}.{dif['property']}: {dif['old']}->{dif['new']}" + ) + except KeyError: + print(f" - FAILURE {dif['class']} [{dif['name']}] -> None") diff --git a/uswid/format.py b/uswid/format.py index a931d36..a58e04b 100644 --- a/uswid/format.py +++ b/uswid/format.py @@ -25,6 +25,7 @@ class uSwidFormatBase: * ``uSwidFormatPkgconfig`` (``.load`` only) * ``uSwidFormatSwid`` * ``uSwidFormatUswid`` + * ``uSwidFormatPe`` """ def __init__(self, name: str, verbose: bool = False) -> None: diff --git a/uswid/format_pe.py b/uswid/format_pe.py new file mode 100644 index 0000000..20ffa64 --- /dev/null +++ b/uswid/format_pe.py @@ -0,0 +1,175 @@ +#!/usr/bin/python3 +# -*- coding: utf-8 -*- +# +# Copyright (C) 2021 Richard Hughes +# +# SPDX-License-Identifier: BSD-2-Clause-Patent +# +# pylint: disable=too-few-public-methods,protected-access + +from typing import Optional + +import sys +import shutil +import subprocess +import tempfile +import os + +import pefile + +from .container import uSwidContainer +from .format import uSwidFormatBase +from .errors import NotSupportedError +from .format_coswid import uSwidFormatCoswid + + +def _adjust_SectionSize(sz, align): + if sz % align: + sz = ((sz + align) // align) * align + return sz + + +def _pe_get_section_by_name(pe: pefile.PE, name: str) -> pefile.SectionStructure: + for sect in pe.sections: + if sect.Name == name.encode().ljust(8, b"\0"): + return sect + return None + + +def _load_efi_objcopy(filepath: str, objcopy: str) -> uSwidContainer: + """read EFI file using objcopy""" + objcopy_full = shutil.which(objcopy) + if not objcopy_full: + print(f"executable {objcopy} not found") + sys.exit(1) + with tempfile.NamedTemporaryFile( + mode="w+b", prefix="objcopy_", suffix=".bin", delete=True + ) as dst: + try: + # pylint: disable=unexpected-keyword-arg + subprocess.check_output( + [ + objcopy_full, + "-O", + "binary", + "--only-section=.sbom", + filepath, + dst.name, + ], + stderr=subprocess.PIPE, + ) + except subprocess.CalledProcessError as e: + print(e) + sys.exit(1) + component = uSwidFormatCoswid().load(dst.read()) + component.add_source_filename(filepath) + return component + + +def _save_efi_objcopy( + container: uSwidContainer, + filepath: str, + cc: Optional[str], + cflags: str, + objcopy: str, +) -> None: + """modify EFI file using objcopy""" + objcopy_full = shutil.which(objcopy) + if not objcopy_full: + print(f"executable {objcopy} not found") + sys.exit(1) + if not os.path.exists(filepath): + if not cc: + raise NotSupportedError("compiler is required for missing section") + subprocess.run( + [cc, "-x", "c", "-c", "-o", filepath, "/dev/null"] + cflags.split(" "), + check=True, + ) + + # save to file? + try: + blob = uSwidFormatCoswid().save(container) + except NotSupportedError as e: + print(e) + sys.exit(1) + + with tempfile.NamedTemporaryFile( + mode="wb", prefix="objcopy_", suffix=".bin", delete=True + ) as src: + src.write(blob) + src.flush() + try: + # pylint: disable=unexpected-keyword-arg + subprocess.check_output( + [ + objcopy_full, + "--remove-section=.sbom", + "--add-section", + f".sbom={src.name}", + "--set-section-flags", + ".sbom=contents,alloc,load,readonly,data", + filepath, + ] + ) + except subprocess.CalledProcessError as e: + print(e) + sys.exit(1) + + +class uSwidFormatPe(uSwidFormatBase): + """PE file""" + + def __init__(self, filepath: Optional[str] = None) -> None: + """Initializes uSwidFormatPe""" + uSwidFormatBase.__init__(self, "PE") # type:ignore[call-arg] + self.objcopy: Optional[str] = None + self.cc: Optional[str] = None + self.cflags: Optional[str] = None + self.filepath: Optional[str] = filepath + + def load(self, blob: bytes, path: Optional[str] = None) -> uSwidContainer: + + if not path: + raise NotSupportedError("cannot load when no path") + if self.objcopy: + return _load_efi_objcopy(path, objcopy=self.objcopy) + + pe = pefile.PE(data=blob) + sect = _pe_get_section_by_name(pe, ".sbom") + if not sect: + raise NotSupportedError( + "PE files have to have an linker-defined .sbom section" + ) + container = uSwidFormatCoswid().load(sect.get_data()) + if self.filepath: + for component in container: + component.add_source_filename(self.filepath) + return container + + def save(self, container: uSwidContainer) -> bytes: + + if not self.filepath: + raise NotSupportedError("cannot save when no path") + if self.objcopy: + # if not self.cflags: + # raise NotSupportedError("cannot save when no cflags") + _save_efi_objcopy( + container, self.filepath, self.cc, self.cflags or "", self.objcopy + ) + return b"" + + blob = uSwidFormatCoswid().save(container) + pe = pefile.PE(self.filepath) + sect = _pe_get_section_by_name(pe, ".sbom") + if not sect: + raise NotSupportedError( + "PE files have to have an linker-defined .sbom section" + ) + + # can we squeeze the new uSWID blob into the existing space + sect.Misc = len(blob) + if len(blob) <= sect.SizeOfRawData: + pe.set_bytes_at_offset(sect.PointerToRawData, blob) + + # save + return pe.write()