diff --git a/sigstore/_cli.py b/sigstore/_cli.py index 093c848c8..915fbef97 100644 --- a/sigstore/_cli.py +++ b/sigstore/_cli.py @@ -31,6 +31,7 @@ from sigstore._internal.fulcio.client import DEFAULT_FULCIO_URL, FulcioClient from sigstore._internal.rekor.client import DEFAULT_REKOR_URL, RekorClient from sigstore._internal.tuf import TrustUpdater +from sigstore._utils import PEMCert from sigstore.oidc import ( DEFAULT_OAUTH_ISSUER_URL, STAGING_OAUTH_ISSUER_URL, @@ -818,7 +819,7 @@ def _collect_verification_state( with file.open(mode="rb", buffering=0) as io: materials = VerificationMaterials( input_=io, - cert_pem=cert_pem, + cert_pem=PEMCert(cert_pem), signature=signature, rekor_entry=entry, offline=args.offline, diff --git a/sigstore/_internal/ctfe.py b/sigstore/_internal/ctfe.py index e0eaf0e32..cc244981a 100644 --- a/sigstore/_internal/ctfe.py +++ b/sigstore/_internal/ctfe.py @@ -25,7 +25,7 @@ from cryptography.hazmat.primitives import hashes from cryptography.hazmat.primitives.asymmetric import ec, rsa -from sigstore._utils import key_id, load_pem_public_key +from sigstore._utils import KeyID, key_id, load_pem_public_key class CTKeyringError(Exception): @@ -70,7 +70,7 @@ def add(self, key_pem: bytes) -> None: key = load_pem_public_key(key_pem) self._keyring[key_id(key)] = key - def verify(self, *, key_id: bytes, signature: bytes, data: bytes) -> None: + def verify(self, *, key_id: KeyID, signature: bytes, data: bytes) -> None: """ Verify that `signature` is a valid signature for `data`, using the key identified by `key_id`. diff --git a/sigstore/_internal/fulcio/client.py b/sigstore/_internal/fulcio/client.py index 9386e3d27..9fcd66ae3 100644 --- a/sigstore/_internal/fulcio/client.py +++ b/sigstore/_internal/fulcio/client.py @@ -45,6 +45,8 @@ ) from pydantic import BaseModel, Field, validator +from sigstore._utils import B64Str + logger = logging.getLogger(__name__) DEFAULT_FULCIO_URL = "https://fulcio.sigstore.dev" @@ -193,9 +195,9 @@ def __init__(self, url: str, session: requests.Session) -> None: def _serialize_cert_request(req: CertificateSigningRequest) -> str: data = { - "certificateSigningRequest": base64.b64encode( - req.public_bytes(serialization.Encoding.PEM) - ).decode() + "certificateSigningRequest": B64Str( + base64.b64encode(req.public_bytes(serialization.Encoding.PEM)).decode() + ) } return json.dumps(data) diff --git a/sigstore/_internal/merkle.py b/sigstore/_internal/merkle.py index 9c07cb140..ef57364f8 100644 --- a/sigstore/_internal/merkle.py +++ b/sigstore/_internal/merkle.py @@ -26,6 +26,7 @@ import struct from typing import List, Tuple +from sigstore._utils import HexStr from sigstore.transparency import LogEntry @@ -123,9 +124,9 @@ def verify_merkle_inclusion(entry: LogEntry) -> None: leaf_hash, inclusion_proof.hashes[:inner], inclusion_proof.log_index ) - calc_hash: str = _chain_border_right( - intermediate_result, inclusion_proof.hashes[inner:] - ).hex() + calc_hash: HexStr = HexStr( + _chain_border_right(intermediate_result, inclusion_proof.hashes[inner:]).hex() + ) if calc_hash != inclusion_proof.root_hash: raise InvalidInclusionProofError( diff --git a/sigstore/_internal/oidc/oauth.py b/sigstore/_internal/oidc/oauth.py index 0f5049352..582268b75 100644 --- a/sigstore/_internal/oidc/oauth.py +++ b/sigstore/_internal/oidc/oauth.py @@ -28,6 +28,7 @@ import uuid from typing import Any, Dict, List, Optional, cast +from sigstore._utils import B64Str from sigstore.oidc import IdentityError, Issuer logger = logging.getLogger(__name__) @@ -170,13 +171,13 @@ def __init__(self, client_id: str, client_secret: str, issuer: Issuer): self._state = str(uuid.uuid4()) self._nonce = str(uuid.uuid4()) - self.code_verifier = ( + self.code_verifier = B64Str( base64.urlsafe_b64encode(os.urandom(32)).rstrip(b"=").decode() ) @property def code_challenge(self) -> str: - return ( + return B64Str( base64.urlsafe_b64encode( hashlib.sha256(self.code_verifier.encode()).digest() ) diff --git a/sigstore/_internal/rekor/client.py b/sigstore/_internal/rekor/client.py index 01862ac1d..ebd884557 100644 --- a/sigstore/_internal/rekor/client.py +++ b/sigstore/_internal/rekor/client.py @@ -32,7 +32,7 @@ from sigstore._internal.ctfe import CTKeyring from sigstore._internal.tuf import TrustUpdater -from sigstore._utils import base64_encode_pem_cert +from sigstore._utils import B64Str, base64_encode_pem_cert from sigstore.transparency import LogEntry logger = logging.getLogger(__name__) @@ -137,9 +137,9 @@ def get( def post( self, - b64_artifact_signature: str, + b64_artifact_signature: B64Str, sha256_artifact_hash: str, - b64_cert: str, + b64_cert: B64Str, ) -> LogEntry: """ Submit a new entry for inclusion in the Rekor log. @@ -202,9 +202,9 @@ def post( "apiVersion": "0.0.1", "spec": { "signature": { - "content": base64.b64encode(signature).decode(), + "content": B64Str(base64.b64encode(signature).decode()), "publicKey": { - "content": base64_encode_pem_cert(certificate), + "content": B64Str(base64_encode_pem_cert(certificate)), }, }, "data": { diff --git a/sigstore/_internal/sct.py b/sigstore/_internal/sct.py index 2937aa53d..ac4036780 100644 --- a/sigstore/_internal/sct.py +++ b/sigstore/_internal/sct.py @@ -36,7 +36,7 @@ CTKeyringError, CTKeyringLookupError, ) -from sigstore._utils import key_id +from sigstore._utils import DERCert, KeyID, key_id logger = logging.getLogger(__name__) @@ -50,7 +50,7 @@ def _pack_signed_entry( # # [0]: opaque ASN.1Cert<1..2^24-1> pack_format = "!BBB{cert_der_len}s" - cert_der = cert.public_bytes(encoding=serialization.Encoding.DER) + cert_der = DERCert(cert.public_bytes(encoding=serialization.Encoding.DER)) elif sct.entry_type == LogEntryType.PRE_CERTIFICATE: if not issuer_key_id or len(issuer_key_id) != 32: raise InvalidSctError("API misuse: issuer key ID missing") @@ -62,7 +62,7 @@ def _pack_signed_entry( pack_format = "!32sBBB{cert_der_len}s" # Precertificates must have their SCT list extension filtered out. - cert_der = cert.tbs_precertificate_bytes + cert_der = DERCert(cert.tbs_precertificate_bytes) fields.append(issuer_key_id) else: raise InvalidSctError(f"unknown SCT log entry type: {sct.entry_type!r}") @@ -85,14 +85,14 @@ def _pack_signed_entry( def _pack_digitally_signed( sct: SignedCertificateTimestamp, cert: Certificate, - issuer_key_id: Optional[bytes], + issuer_key_id: Optional[KeyID], ) -> bytes: """ Packs the contents of `cert` (and some pieces of `sct`) into a structured blob, one that forms the signature body of the "digitally-signed" struct for an SCT. - The format of the digitally signed data is described in IETF's RFC 6962. + The format of the digitaly signed data is described in IETF's RFC 6962. """ # No extensions are currently specified, so we treat the presence @@ -190,7 +190,7 @@ def verify_sct( # to expose this trivial single member, so we use the `log_id` # attribute directly. ct_keyring.verify( - key_id=sct.log_id, signature=sct.signature, data=digitally_signed + key_id=KeyID(sct.log_id), signature=sct.signature, data=digitally_signed ) except CTKeyringLookupError as exc: # We specialize this error case, since it usually indicates one of diff --git a/sigstore/_utils.py b/sigstore/_utils.py index 1fc6c0399..c147cc7d9 100644 --- a/sigstore/_utils.py +++ b/sigstore/_utils.py @@ -21,7 +21,7 @@ import base64 import hashlib import sys -from typing import IO, Union +from typing import IO, NewType, Union from cryptography.hazmat.primitives import serialization from cryptography.hazmat.primitives.asymmetric import ec, rsa @@ -32,8 +32,30 @@ else: from importlib import resources + PublicKey = Union[rsa.RSAPublicKey, ec.EllipticCurvePublicKey] +HexStr = NewType("HexStr", str) +""" +A newtype for `str` objects that contain hexadecimal strings (e.g. `ffabcd00ff`). +""" +B64Str = NewType("B64Str", str) +""" +A newtype for `str` objects that contain base64 encoded strings. +""" +PEMCert = NewType("PEMCert", str) +""" +A newtype for `str` objects that contain PEM-encoded certificates. +""" +DERCert = NewType("DERCert", bytes) +""" +A newtype for `bytes` objects that contain DER-encoded certificates. +""" +KeyID = NewType("KeyID", bytes) +""" +A newtype for `bytes` objects that contain a key id. +""" + class InvalidKey(Exception): """ @@ -61,15 +83,17 @@ def load_pem_public_key(key_pem: bytes) -> PublicKey: return key -def base64_encode_pem_cert(cert: Certificate) -> str: +def base64_encode_pem_cert(cert: Certificate) -> B64Str: """ Returns a string containing a base64-encoded PEM-encoded X.509 certificate. """ - return base64.b64encode(cert.public_bytes(serialization.Encoding.PEM)).decode() + return B64Str( + base64.b64encode(cert.public_bytes(serialization.Encoding.PEM)).decode() + ) -def key_id(key: PublicKey) -> bytes: +def key_id(key: PublicKey) -> KeyID: """ Returns an RFC 6962-style "key ID" for the given public key. @@ -80,7 +104,7 @@ def key_id(key: PublicKey) -> bytes: format=serialization.PublicFormat.SubjectPublicKeyInfo, ) - return hashlib.sha256(public_bytes).digest() + return KeyID(hashlib.sha256(public_bytes).digest()) def sha256_streaming(io: IO[bytes]) -> bytes: diff --git a/sigstore/sign.py b/sigstore/sign.py index 51c95baf0..97e1e9f4d 100644 --- a/sigstore/sign.py +++ b/sigstore/sign.py @@ -71,7 +71,7 @@ from sigstore._internal.rekor.client import RekorClient from sigstore._internal.sct import verify_sct from sigstore._internal.tuf import TrustUpdater -from sigstore._utils import sha256_streaming +from sigstore._utils import B64Str, HexStr, PEMCert, sha256_streaming from sigstore.transparency import LogEntry logger = logging.getLogger(__name__) @@ -166,7 +166,7 @@ def sign( artifact_signature = private_key.sign( input_digest, ec.ECDSA(Prehashed(hashes.SHA256())) ) - b64_artifact_signature = base64.b64encode(artifact_signature).decode() + b64_artifact_signature = B64Str(base64.b64encode(artifact_signature).decode()) # Prepare inputs b64_cert = base64.b64encode( @@ -175,17 +175,19 @@ def sign( # Create the transparency log entry entry = self._rekor.log.entries.post( - b64_artifact_signature=b64_artifact_signature, + b64_artifact_signature=B64Str(b64_artifact_signature), sha256_artifact_hash=input_digest.hex(), - b64_cert=b64_cert.decode(), + b64_cert=B64Str(b64_cert.decode()), ) logger.debug(f"Transparency log entry created with index: {entry.log_index}") return SigningResult( - input_digest=input_digest.hex(), - cert_pem=cert.public_bytes(encoding=serialization.Encoding.PEM).decode(), - b64_signature=b64_artifact_signature, + input_digest=HexStr(input_digest.hex()), + cert_pem=PEMCert( + cert.public_bytes(encoding=serialization.Encoding.PEM).decode() + ), + b64_signature=B64Str(b64_artifact_signature), log_entry=entry, ) @@ -195,17 +197,17 @@ class SigningResult(BaseModel): Represents the artifacts of a signing operation. """ - input_digest: str + input_digest: HexStr """ The hex-encoded SHA256 digest of the input that was signed for. """ - cert_pem: str + cert_pem: PEMCert """ The PEM-encoded public half of the certificate used for signing. """ - b64_signature: str + b64_signature: B64Str """ The base64-encoded signature. """ diff --git a/sigstore/transparency.py b/sigstore/transparency.py index ca6e7a5b5..3854e1fd1 100644 --- a/sigstore/transparency.py +++ b/sigstore/transparency.py @@ -24,6 +24,8 @@ from pydantic import BaseModel, Field, StrictInt, StrictStr, validator from securesystemslib.formats import encode_canonical +from sigstore._utils import B64Str + @dataclass(frozen=True) class LogEntry: @@ -43,7 +45,7 @@ class LogEntry: Not present for `LogEntry` instances loaded from Sigstore bundles. """ - body: str + body: B64Str """ The base64-encoded body of the transparency log entry. """ @@ -71,7 +73,7 @@ class LogEntry: Only present for entries retrieved from online logs. """ - signed_entry_timestamp: str + signed_entry_timestamp: B64Str """ The base64-encoded Signed Entry Timestamp (SET) for this log entry. """ diff --git a/sigstore/verify/models.py b/sigstore/verify/models.py index 37e1cd0ba..59033fce0 100644 --- a/sigstore/verify/models.py +++ b/sigstore/verify/models.py @@ -34,7 +34,12 @@ from sigstore_protobuf_specs.dev.sigstore.bundle.v1 import Bundle from sigstore._internal.rekor import RekorClient -from sigstore._utils import base64_encode_pem_cert, sha256_streaming +from sigstore._utils import ( + B64Str, + PEMCert, + base64_encode_pem_cert, + sha256_streaming, +) from sigstore.transparency import LogEntry, LogInclusionProof logger = logging.getLogger(__name__) @@ -178,7 +183,7 @@ def __init__( self, *, input_: IO[bytes], - cert_pem: str, + cert_pem: PEMCert, signature: bytes, offline: bool = False, rekor_entry: LogEntry | None, @@ -220,7 +225,7 @@ def from_bundle( certs = bundle.verification_material.x509_certificate_chain.certificates if len(certs) == 0: raise InvalidMaterials("expected non-empty certificate chain in bundle") - cert_pem = ( + cert_pem = PEMCert( load_der_x509_certificate(certs[0].raw_bytes) .public_bytes(Encoding.PEM) .decode() @@ -243,19 +248,21 @@ def from_bundle( ) entry = LogEntry( uuid=None, - body=base64.b64encode(tlog_entry.canonicalized_body).decode(), + body=B64Str(base64.b64encode(tlog_entry.canonicalized_body).decode()), integrated_time=tlog_entry.integrated_time, log_id=tlog_entry.log_id.key_id.hex(), log_index=tlog_entry.log_index, inclusion_proof=inclusion_proof, - signed_entry_timestamp=base64.b64encode( - tlog_entry.inclusion_promise.signed_entry_timestamp - ).decode(), + signed_entry_timestamp=B64Str( + base64.b64encode( + tlog_entry.inclusion_promise.signed_entry_timestamp + ).decode() + ), ) return cls( input_=input_, - cert_pem=cert_pem, + cert_pem=PEMCert(cert_pem), signature=signature, offline=offline, rekor_entry=entry, @@ -311,8 +318,10 @@ def rekor_entry(self, client: RekorClient) -> LogEntry: "apiVersion": "0.0.1", "spec": { "signature": { - "content": base64.b64encode(self.signature).decode(), - "publicKey": {"content": base64_encode_pem_cert(self.certificate)}, + "content": B64Str(base64.b64encode(self.signature).decode()), + "publicKey": { + "content": B64Str(base64_encode_pem_cert(self.certificate)) + }, }, "data": { "hash": {"algorithm": "sha256", "value": self.input_digest.hex()} diff --git a/sigstore/verify/verifier.py b/sigstore/verify/verifier.py index 53bcd8a5c..123e8f126 100644 --- a/sigstore/verify/verifier.py +++ b/sigstore/verify/verifier.py @@ -43,6 +43,7 @@ from sigstore._internal.rekor.client import RekorClient from sigstore._internal.set import InvalidSetError, verify_set from sigstore._internal.tuf import TrustUpdater +from sigstore._utils import B64Str, HexStr from sigstore.verify.models import InvalidRekorEntry as InvalidRekorEntryError from sigstore.verify.models import RekorEntryMissing as RekorEntryMissingError from sigstore.verify.models import ( @@ -66,12 +67,12 @@ class LogEntryMissing(VerificationFailure): "The transparency log has no entry for the given verification materials" ) - signature: str + signature: B64Str """ The signature present during lookup failure, encoded with base64. """ - artifact_hash: str + artifact_hash: HexStr """ The artifact hash present during lookup failure, encoded as a hex string. """ @@ -235,7 +236,7 @@ def verify( entry = materials.rekor_entry(self._rekor) except RekorEntryMissingError: return LogEntryMissing( - signature=base64.b64encode(materials.signature).decode(), + signature=B64Str(base64.b64encode(materials.signature).decode()), artifact_hash=materials.input_digest.hex(), ) except InvalidRekorEntryError: