Skip to content

Commit

Permalink
... fix invalid CA certificate ...
Browse files Browse the repository at this point in the history
  • Loading branch information
milahu committed Jun 1, 2024
1 parent 8d492f7 commit 8298b7f
Show file tree
Hide file tree
Showing 2 changed files with 99 additions and 27 deletions.
29 changes: 26 additions & 3 deletions aia.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from cryptography import x509
from cryptography.hazmat.primitives.serialization import Encoding
from cryptography.hazmat.primitives.serialization import pkcs7
from cryptography.hazmat.primitives import hashes

import certifi

Expand Down Expand Up @@ -122,6 +123,27 @@ def openssl_get_cert_info(cert_der):
return cert_info



def print_cert(cert, label=None):
if label:
print(label + ":")
if isinstance(cert, cryptography.x509.Certificate):
# cryptography cert
# https://cryptography.io/en/latest/x509/reference/
print(f" subject: {cert.subject}")
print(f" issuer: {cert.issuer})")
print(f" fingerprint: {cert.fingerprint(hashes.SHA256())}")
return
if isinstance(cert, OpenSSL.crypto.X509):
# pyopenssl cert
print(f" subject: {cert.get_subject()}")
print(f" issuer: {cert.get_issuer()})")
print(f' fingerprint: {cert.digest("sha256")}')
return
raise ValueError("unknown cert type {type(cert)}")



class AIASession:

def __init__(
Expand Down Expand Up @@ -364,6 +386,7 @@ def add_trusted_root_cert(self, cert):
cert = cert.to_cryptography()
assert isinstance(cert, cryptography.x509.Certificate)
"""
print_cert(cert, "add_trusted_root_cert cert")
if isinstance(cert, cryptography.x509.Certificate):
# convert cert from cryptography to pyopenssl
# for OpenSSL.crypto.X509StoreContext
Expand All @@ -387,7 +410,8 @@ def add_trusted_root_cert(self, cert):
for c in self._trusted_root_certs:
if c.digest("sha256") == cert_digest:
return False # cert already was added
logger.debug(f"adding trusted root cert {cert}")
digest_hex = cert_digest # .decode("ascii").replace(":", "").lower()
logger.debug(f"adding trusted root cert {digest_hex}")
self._trusted_root_certs.append(cert)
return True # cert was added

Expand Down Expand Up @@ -443,7 +467,7 @@ def print_chain(cert_chain):
for (idx, cert) in enumerate(cert_chain):
print(f" {idx} subject: {cert.get_subject()}")
print(f" issuer: {cert.get_issuer()})")
print(f' fingerprint: {cert.digest("sha1")}')
print(f' fingerprint: {cert.digest("sha256")}')

print("verified_cert_chain"); print_chain(verified_cert_chain)
print("rest_cert_chain"); print_chain(rest_cert_chain)
Expand Down Expand Up @@ -486,7 +510,6 @@ def print_chain(cert_chain):

# add trusted root certs
for cert in self._trusted_root_certs:
# FIXME invalid CA certificate @ ctx.verify_certificate()
store.add_cert(cert)

# verified_cert_chain[0:-1] certs are verified
Expand Down
97 changes: 73 additions & 24 deletions test/test.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,10 @@ def create_cert(name, issuer_cert=None, issuer_key=None, issuer_cert_url=None, i

print(f"creating cert {repr(name)}")

# https://cryptography.io/en/latest/x509/tutorial/
# https://cryptography.io/en/latest/x509/reference/#x-509-certificate-builder
# https://stackoverflow.com/questions/56285000/python-cryptography-create-a-certificate-signed-by-an-existing-ca-and-export
# https://gist.github.com/major/8ac9f98ae8b07f46b208

is_root = issuer_cert is None

Expand All @@ -99,6 +101,8 @@ def create_cert(name, issuer_cert=None, issuer_key=None, issuer_cert_url=None, i

issuer_name = subject_name if is_root else issuer_cert.subject

issuer_key = key if is_root else issuer_key

cert = x509.CertificateBuilder()

cert = cert.subject_name(subject_name)
Expand All @@ -108,12 +112,46 @@ def create_cert(name, issuer_cert=None, issuer_key=None, issuer_cert_url=None, i
cert = cert.not_valid_before(datetime.datetime.utcnow())
cert = cert.not_valid_after(datetime.datetime.utcnow() + datetime.timedelta(days=3650))

# FIXME invalid CA certificate @ ctx.verify_certificate()
cert = cert.add_extension(
x509.SubjectKeyIdentifier.from_public_key(key.public_key()),
critical=False,
)

# https://stackoverflow.com/a/72320618/10440128
#if not is_leaf: # no. certificate signature failure
if is_root:
#if is_root: # no. invalid CA certificate @ cert1

if not is_leaf:
cert = cert.add_extension(x509.BasicConstraints(ca=True, path_length=None), critical=True)
cert = cert.add_extension(
x509.KeyUsage(
digital_signature=True,
content_commitment=False,
key_encipherment=False,
data_encipherment=False,
key_agreement=False,
key_cert_sign=True,
crl_sign=True,
encipher_only=False,
decipher_only=False,
),
critical=True,
)
else:
cert = cert.add_extension(
x509.AuthorityKeyIdentifier.from_issuer_subject_key_identifier(
issuer_cert.extensions.get_extension_for_class(x509.SubjectKeyIdentifier).value
),
critical=False,
)

if is_leaf:
cert = cert.add_extension(
x509.ExtendedKeyUsage([
x509.ExtendedKeyUsageOID.CLIENT_AUTH,
x509.ExtendedKeyUsageOID.SERVER_AUTH,
]),
critical=False,
)

if issuer_cert_url:
# add AIA extension
Expand All @@ -126,7 +164,9 @@ def create_cert(name, issuer_cert=None, issuer_key=None, issuer_cert_url=None, i
),
]), critical=False)

cert = cert.sign(key, hashes.SHA256(), default_backend())
# no. certificate signature failure
#cert = cert.sign(key, hashes.SHA256(), default_backend())
cert = cert.sign(issuer_key, hashes.SHA256(), default_backend())

return cert, key

Expand Down Expand Up @@ -178,8 +218,6 @@ def run_test(tmpdir):
# create certs
# TODO refactor ... create_cert_chain

# FIXME invalid CA certificate @ ctx.verify_certificate()

cert0, key0 = create_cert("root cert")
with open(f"{server_root}/cert0", "wb") as f:
# PEM format
Expand Down Expand Up @@ -298,33 +336,31 @@ def handle_exit():
print("creating aia_session")
aia_session = aia.AIASession()

def print_chain(cert_chain):
if not cert_chain:
print(" (empty)")
return
for (idx, cert) in enumerate(cert_chain):
print(f" {idx} subject: {cert.get_subject()}")
print(f" issuer: {cert.get_issuer()})")
print(f' fingerprint: {cert.digest("sha1")}')

def print_cert(cert, label=None):
def print_cert(cert, label=None, indent=""):
if label:
print(label + ":")
print(indent + label + ":")
if isinstance(cert, cryptography.x509.Certificate):
# cryptography cert
# https://cryptography.io/en/latest/x509/reference/
print(f" subject: {cert.subject}")
print(f" issuer: {cert.issuer})")
print(f" fingerprint: {cert.fingerprint(hashes.SHA256())}")
print(indent + f" subject: {cert.subject}")
print(indent + f" issuer: {cert.issuer})")
print(indent + f" fingerprint: {cert.fingerprint(hashes.SHA256())}")
return
if isinstance(cert, OpenSSL.crypto.X509):
# pyopenssl cert
print(f" subject: {cert.get_subject()}")
print(f" issuer: {cert.get_issuer()})")
print(f' fingerprint: {cert.digest("sha256")}')
print(indent + f" subject: {cert.get_subject()}")
print(indent + f" issuer: {cert.get_issuer()})")
print(indent + f' fingerprint: {cert.digest("sha256")}')
return
raise ValueError("unknown cert type {type(cert)}")

def print_chain(cert_chain):
if not cert_chain:
print(" (empty)")
return
for (idx, cert) in enumerate(cert_chain):
print_cert(cert, f"cert {idx}", " ")

print("aia_session.aia_chase ...")

print("-" * 80)
Expand Down Expand Up @@ -365,17 +401,24 @@ def print_cert(cert, label=None):

test_name = "aia_session.add_trusted_root_cert with non-root cert"
print(f"{test_name} ...")
print_cert(cert1, "cert1")
try:
aia_session.add_trusted_root_cert(cert1)
#raise ValueError("must be a CA cert")
#raise ValueError("must be a self-signed cert")
except ValueError as exc:
assert str(exc) == "must be a CA cert"
expected_errors = [
"must be a CA cert",
"must be a self-signed cert",
]
assert str(exc) in expected_errors
print(f"{test_name} ok")

print("-" * 80)

test_name = "aia_session.add_trusted_root_cert"
print(f"{test_name} ...")
print_cert(cert0, "cert0")
assert aia_session.add_trusted_root_cert(cert0) == True
assert aia_session.add_trusted_root_cert(cert0) == False # already exists
print(f"{test_name} ok")
Expand All @@ -398,9 +441,15 @@ def print_cert(cert, label=None):
print("FIXME got unexpected exception:")
print("exc.args", exc.args)
print("exc.certificate", exc.certificate)
print_cert(exc.certificate, "exc.certificate")
print("exc.errors", exc.errors)
print("exc str", str(exc))
print("exc dir", dir(exc))
cert_path = f"/run/user/{os.getuid()}/python-aia-invalid-ca-cert.pem"
print("writing", cert_path)
cert_pem = OpenSSL.crypto.dump_certificate(OpenSSL.crypto.FILETYPE_PEM, exc.certificate)
with open(cert_path, "wb") as f:
f.write(cert_pem)
raise
"""
FIXME got unexpected exception:
Expand Down

0 comments on commit 8298b7f

Please sign in to comment.