Skip to content

Commit

Permalink
... FIXME invalid CA certificate ...
Browse files Browse the repository at this point in the history
  • Loading branch information
milahu committed Jun 1, 2024
1 parent 5aa93be commit 8d492f7
Show file tree
Hide file tree
Showing 2 changed files with 241 additions and 26 deletions.
87 changes: 76 additions & 11 deletions aia.py
Original file line number Diff line number Diff line change
Expand Up @@ -143,14 +143,15 @@ def __init__(
self._context = OpenSSL.SSL.Context(method=OpenSSL.SSL.TLS_CLIENT_METHOD)
self._context.load_verify_locations(cafile=certifi.where())
self._cadata_from_host_regex = dict()
self._trusted_root_certs = list()

@CachedMethod
def get_host_cert_chain(self, host, timeout=5):
"""
Get the certificate chain from the target host,
without checking it, without fetching missing certs.
"""
logger.debug(f"Downloading TLS certificate chain from {host}")
logger.debug(f"Downloading TLS certificate chain from https://{host}")
port = 443
if ':' in host:
host, port = host.split(':')
Expand Down Expand Up @@ -215,7 +216,8 @@ def _read_cert_cache(self, url_parsed):
if row:
logger.debug(f"found cert in cache_db: {url}")
cert_der = row[0]
# no. here we need pyopenssl cert # TODO why?
# no. here we need pyopenssl cert
# for OpenSSL.crypto.X509StoreContext
#cert = x509.load_der_x509_certificate(cert_der)
cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_ASN1, cert_der)
return cert
Expand All @@ -225,7 +227,8 @@ def _read_cert_cache(self, url_parsed):
logger.debug(f"found cert in cache_dir: {url}")
with open(cache_path, "rb") as f:
cert_der = f.read()
# no. here we need pyopenssl cert # TODO why?
# no. here we need pyopenssl cert
# for OpenSSL.crypto.X509StoreContext
#cert = x509.load_der_x509_certificate(cert_der)
cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_ASN1, cert_der)
return cert
Expand Down Expand Up @@ -261,7 +264,8 @@ def _load_cert_from_bytes(self, cert_bytes):
# TODO pyopenssl or cryptography
# try to load DER = ASN1 format
try:
# no. here we need pyopenssl cert # TODO why?
# no. here we need pyopenssl cert
# for OpenSSL.crypto.X509StoreContext
#cert = x509.load_der_x509_certificate(cert_der)
cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_ASN1, cert_bytes)
return cert
Expand All @@ -283,7 +287,8 @@ def _load_cert_from_bytes(self, cert_bytes):
certs = pkcs7.load_der_pkcs7_certificates(cert_bytes)
assert len(certs) == 1 # TODO
cert = certs[0]
# here we need pyopenssl cert # TODO why?
# here we need pyopenssl cert
# for OpenSSL.crypto.X509StoreContext
cert = OpenSSL.crypto.X509.from_cryptography(cert)
return cert
except ValueError:
Expand All @@ -298,7 +303,8 @@ def _load_cert_from_bytes(self, cert_bytes):
certs = pkcs7.load_pem_pkcs7_certificates(cert_bytes)
assert len(certs) == 1 # TODO
cert = certs[0]
# here we need pyopenssl cert # TODO why?
# here we need pyopenssl cert
# for OpenSSL.crypto.X509StoreContext
cert = OpenSSL.crypto.X509.from_cryptography(cert)
return cert
except ValueError:
Expand Down Expand Up @@ -351,14 +357,66 @@ def _get_ca_issuer_cert(self, url):
self._write_cert_cache(url_parsed, cert)
return cert

def add_trusted_root_cert(self, cert):
"""
if isinstance(cert, OpenSSL.crypto.X509):
# convert cert from pyopenssl to cryptography
cert = cert.to_cryptography()
assert isinstance(cert, cryptography.x509.Certificate)
"""
if isinstance(cert, cryptography.x509.Certificate):
# convert cert from cryptography to pyopenssl
# for OpenSSL.crypto.X509StoreContext
cert = OpenSSL.crypto.X509.from_cryptography(cert)
assert isinstance(cert, OpenSSL.crypto.X509)
try:
ext_bc = cert.to_cryptography().extensions.get_extension_for_class(x509.BasicConstraints)
except cryptography.x509.extensions.ExtensionNotFound:
raise ValueError("must be a CA cert")
if ext_bc.value.ca != True:
raise ValueError("must be a CA cert")
if cert.get_issuer() != cert.get_subject():
raise ValueError("must be a self-signed cert")
# no. this only works with cryptography certs
"""
# note: this check uses cert.__eq__ ("semantic equality") so no need for pointer equality
if cert in self._trusted_root_certs:
return False
"""
cert_digest = cert.digest("sha256")
for c in self._trusted_root_certs:
if c.digest("sha256") == cert_digest:
return False # cert already was added
logger.debug(f"adding trusted root cert {cert}")
self._trusted_root_certs.append(cert)
return True # cert was added

def remove_trusted_root_cert(self, cert):
if isinstance(cert, cryptography.x509.Certificate):
# convert cert from cryptography to pyopenssl
# for OpenSSL.crypto.X509StoreContext
cert = OpenSSL.crypto.X509.from_cryptography(cert)
assert isinstance(cert, OpenSSL.crypto.X509)
cert_digest = cert.digest("sha256")
len1 = len(self._trusted_root_certs)
self._trusted_root_certs = list(filter(
lambda c: c.digest("sha256") != cert_digest,
self._trusted_root_certs.append
))
len2 = len(self._trusted_root_certs)
return len1 != len2 # return True if cert was removed

def aia_chase(self, host, timeout=5, max_chain_depth=100):
"""
Get the certificate chain for host,
up to (and including) the root certificate.
The result is a tuple of
0 = verified_cert_chain
1 = missing_certs: extra certs that had to be fetched to verify the chain
1 = missing_certs
missing_certs are the extra certs
that had to be fetched to verify the chain.
The first cert in cert_chain is the host certificate,
the next certs are the intermediary certificates,
Expand Down Expand Up @@ -423,8 +481,14 @@ def print_chain(cert_chain):

#store = OpenSSL.crypto.X509Store()
# local cert store so we can add temporary certs
# https://www.pyopenssl.org/en/stable/api/crypto.html#x509store-objects
store = self._context.get_cert_store()

# add trusted root certs
for cert in self._trusted_root_certs:
# FIXME invalid CA certificate @ ctx.verify_certificate()
store.add_cert(cert)

# verified_cert_chain[0:-1] certs are verified
# verified_cert_chain[-1] cert is not verified

Expand Down Expand Up @@ -457,7 +521,8 @@ def print_chain(cert_chain):
#print("verify try", i)
# https://github.com/pyca/pyopenssl/pull/948
ctx = OpenSSL.crypto.X509StoreContext(store, cert, missing_certs)
#ctx = OpenSSL.crypto.X509StoreContext(store, cert, [issuer_cert])
# no. this adds *untrusted* certs
#ctx = OpenSSL.crypto.X509StoreContext(store, cert, self._trusted_root_certs + missing_certs)
try:
ctx.verify_certificate()
print("cert is valid. full chain is valid, no missing certs were fetched")
Expand All @@ -476,7 +541,7 @@ def print_chain(cert_chain):
continue
if exc.errors[0] == 19:
# exc.errors [19, 1, 'self-signed certificate in certificate chain']
print("chain ends with untrusted root cert")
print("chain ends with untrusted root cert. hint: aia_session.add_trusted_root_cert(cert)")
raise
print("exc.args", exc.args)
print("exc.certificate", exc.certificate)
Expand Down Expand Up @@ -684,8 +749,8 @@ def cadata_and_host_regex_from_host(self, host, only_missing=False, timeout=5):

def cadata_from_url(self, url, **kwargs):
"""Façade to the ``cadata_from_host`` method."""
split_result = urlsplit(url)
return self.cadata_from_host(split_result.netloc, **kwargs)
url_parsed = urlsplit(url)
return self.cadata_from_host(url_parsed.netloc, **kwargs)

def ssl_context_from_host(self, host, purpose=ssl.Purpose.SERVER_AUTH, **kwargs):
"""
Expand Down
Loading

0 comments on commit 8d492f7

Please sign in to comment.