From 1e4588af07892359282b9adefa49658e44994d3c Mon Sep 17 00:00:00 2001 From: Milan Hauth Date: Sun, 26 May 2024 18:30:21 +0200 Subject: [PATCH] fetch all certs from host DRAFT --- aia.py | 325 ++++++++++++++++++++++++++++++++++++++++++++++++------- setup.py | 3 +- 2 files changed, 285 insertions(+), 43 deletions(-) diff --git a/aia.py b/aia.py index fd29eab..e11c5f0 100644 --- a/aia.py +++ b/aia.py @@ -1,4 +1,5 @@ import os +import sys from contextlib import ExitStack from functools import lru_cache, partial import logging @@ -9,8 +10,12 @@ from urllib.request import urlopen, Request from urllib.parse import urlsplit +# pyopenssl +import OpenSSL + # https://cryptography.io/en/latest/x509/ from cryptography import x509 +from cryptography.hazmat.primitives.serialization import Encoding import certifi @@ -80,6 +85,8 @@ def get_cn_of_name(name): def get_ca_issuers_of_cert(cert): + # convert cert from pyopenssl to cryptography + cert = cert.to_cryptography() try: aia_extension = cert.extensions.get_extension_for_class(x509.AuthorityInformationAccess) except x509.extensions.ExtensionNotFound: @@ -120,31 +127,42 @@ def __init__( """ self.user_agent = user_agent self.cache_dir = cache_dir - self._context = ssl.SSLContext() # TLS (don't check broken chain) - self._context.load_default_certs() - - # Trusted certificates whitelist in dict format like: - # {"RFC4514 string": b"DER certificate contents"} - self._trusted = { - openssl_get_cert_info(ca_der)["subject"]: ca_der - for ca_der in self._context.get_ca_certs(True) - } - + self._context = OpenSSL.SSL.Context(method=OpenSSL.SSL.TLS_CLIENT_METHOD) + self._context.load_verify_locations(cafile=certifi.where()) self._cadata_from_host_regex = dict() @CachedMethod - def get_host_cert(self, host): + def get_host_cert_chain(self, host, timeout=5): """ - Get the DER (binary) certificate for the target host - without checking it (leaf certificate). + Get the certificate chain from the target host, + without checking it, without fetching missing certs. """ - logger.debug(f"Downloading {host} certificate (TLS)") + logger.debug(f"Downloading TLS certificate chain from {host}") port = 443 if ':' in host: host, port = host.split(':') - with socket.create_connection((host, port)) as sock: - with self._context.wrap_socket(sock, server_hostname=host) as ss: - return ss.getpeercert(True) + port = int(port) + # https://stackoverflow.com/a/67212703/10440128 + conn = OpenSSL.SSL.Connection( + self._context, + socket=socket.socket(socket.AF_INET, socket.SOCK_STREAM) + ) + conn.settimeout(timeout) + # NOTE this block can throw OpenSSL.SSL.Error ... + conn.connect((host, port)) + conn.setblocking(1) + conn.set_tlsext_host_name(host.encode()) + conn.do_handshake() + full_cert_chain = conn.get_peer_cert_chain() + verified_cert_chain = conn.get_verified_chain() + conn.close() + if len(verified_cert_chain) == len(full_cert_chain): + # rest_cert_chain is empty + # this does not mean that the chain is valid + # the server can return only 1 cert + return verified_cert_chain, None + rest_cert_chain = full_cert_chain[len(verified_cert_chain):] + return verified_cert_chain, rest_cert_chain @CachedMethod def _get_ca_issuer_cert(self, url): @@ -162,53 +180,216 @@ def _get_ca_issuer_cert(self, url): else: cache_path = None if cache_path and os.path.exists(cache_path): + logger.debug("reading cache", cache_path) with open(cache_path, "rb") as f: - return f.read() # read cache + cert_der = f.read() # read cache + #cert = x509.load_der_x509_certificate(cert_der) no. here we need pyopenssl cert + cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_ASN1, cert_der) + return cert logger.debug(f"Downloading CA issuer certificate at {url}") req = Request(url=url, headers={"User-Agent": self.user_agent}) with urlopen(req) as resp: if resp.status != 200: raise AIADownloadError(f"HTTP {resp.status} (CA Issuer Cert.)") - data = resp.read() + cert_der = resp.read() # check again if cache_path exists. can have multiple writers if cache_path and not os.path.exists(cache_path): + logger.debug("writing cache", cache_path) os.makedirs(os.path.dirname(cache_path), exist_ok=True) with open(cache_path, "wb") as f: - f.write(data) # write cache - return data + f.write(cert_der) # write cache + #cert = x509.load_der_x509_certificate(cert_der) + cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_ASN1, cert_der) + return cert - def aia_chase(self, host): + def aia_chase(self, host, timeout=5): """ - Generator of the certificate chain from a host, + Get the certificate chain for host, up to (and including) the root certificate. - The result is a list a DER bytestring certificate, - whose first item is the host certificate and the next entries - are the intermediary certificates. + The result is a tuple of + 0 = cert_chain + 1 = chain_is_valid + 2 = missing_certs: extra certs that had to be fetched to verify the chain + + The first cert in cert_chain is the host certificate, + the next certs are the intermediary certificates, + the last cert is the root certificate. """ - der_cert = self.get_host_cert(host) + + verified_cert_chain, rest_cert_chain = self.get_host_cert_chain(host, timeout) + + # TODO what to do with rest_cert_chain + + # debug + def print_chain(cert_chain): + if not cert_chain: + print(" (empty)") + return + for (idx, cert) in enumerate(cert_chain): + print(f" {idx} subject: {cert.get_subject()}") + print(f" issuer: {cert.get_issuer()})") + print(f' fingerprint: {cert.digest("sha1")}') + + print("verified_cert_chain"); print_chain(verified_cert_chain) + print("rest_cert_chain"); print_chain(rest_cert_chain) + + # no. when the server sends only 1 cert + # then rest_cert_chain is empty, but the chain can be invalid + #if not rest_cert_chain: + # # full chain is valid, no missing certs were fetched + # return verified_cert_chain, None + + # the first cert (leaf cert) is always in verified_cert_chain + if not verified_cert_chain: + # no certs were received + return None, None, None + + # chase: fetch missing certs + + #store = OpenSSL.crypto.X509Store() + # local cert store so we can add certs + store = self._context.get_cert_store() + + # verified_cert_chain[0:-1] certs are verified + # verified_cert_chain[-1] cert is not verified + + # https://github.com/pyca/pyopenssl/pull/948 + #issuer_certs = [] + + max_chain_depth = 123 + + for verify_chain_idx in range(max_chain_depth): + + cert = verified_cert_chain[-1] # not verified + + aia_ca_issuers = get_ca_issuers_of_cert(cert) + print("aia_ca_issuers", aia_ca_issuers) + assert len(aia_ca_issuers) > 0 + issuer_cert = self._get_ca_issuer_cert(aia_ca_issuers[0]) + print("issuer_cert subject", issuer_cert.get_subject()) + print("issuer_cert issuer ", issuer_cert.get_issuer()) + #issuer_certs.append(issuer_cert) + issuer_certs = [issuer_cert] + + # verify this cert + #while True: + #for i in range(2): + #print("verify try", i) + ctx = OpenSSL.crypto.X509StoreContext(store, cert, issuer_certs) + try: + ctx.verify_certificate() + print("cert is valid. full chain is valid, no missing certs were fetched") + # cert is valid + # full chain is valid, no missing certs were fetched + #verified_cert_chain.append(issuer_cert.to_cryptography()) + verified_cert_chain.append(issuer_cert) + verified_cert_chain = list(map(lambda c: c.to_cryptography(), verified_cert_chain)) + return verified_cert_chain, True, issuer_certs + except OpenSSL.crypto.X509StoreContextError as exc: + print("exc.args", exc.args) + print("exc.certificate", exc.certificate) + print("exc.errors", exc.errors) + print("exc str", str(exc)) + + raise + + ''' + if exc.errors[0] == 20: # unable to get local issuer certificate + # still cannot verify this cert + raise + elif exc.errors[0] == 18: # self-signed certificate + # this root cert is not in ca-bundle.crt + # TODO let the user decide whether to trust this root cert + raise + else: + raise + ''' + + # ? + #verified_cert_chain.append(issuer_cert.to_cryptography()) + + ''' + # unable to get local issuer certificate + print("todo fetch") + + #if exc.args[0] != "unable to get local issuer certificate": + # raise exc + + # assert exc.args[0] == "unable to get local issuer certificate" + # exc.errors == [20, 0, 'unable to get local issuer certificate'] + + # exc.args ('self-signed certificate',) + # exc.errors [18, 0, 'self-signed certificate'] + + #pass + + # cert: unable to get local issuer certificate + # fetch issuer cert + + #cert_issuer = cert.get_issuer() + #cert_fingerprint = cert.digest("sha1") + + # TODO remove openssl_get_cert_info + # use only get_ca_issuers_of_cert + aia_ca_issuers = get_ca_issuers_of_cert(cert) + print("aia_ca_issuers", aia_ca_issuers) + assert len(aia_ca_issuers) > 0 + issuer_cert = self._get_ca_issuer_cert(aia_ca_issuers[0]) + print("issuer_cert", issuer_cert) + issuer_certs.append(issuer_cert) + # retry validation + ''' + + ############### + + ''' + + print("mmkay") + + raise 123 + + # get_peer_cert_chain + # get_verified_chain + # add_extra_chain_cert(certobj) + + # TODO validate chain, step by step + + der_cert = cert_chain[0] + der_cert_was_fetched = False # Traverse the AIA path until it gets a self-signed certificate # or a certificate without a "parent" issuer URI reference while True: cert_dict = openssl_get_cert_info(der_cert) cert_issuer = cert_dict["issuer"] - if cert_dict["subject"] == cert_issuer: # Self-signed (root) cert + print("cert_issuer", cert_issuer, cert_issuer in self._trusted) + if cert_dict["subject"] == cert_issuer: # Self-signed (root) cert # is this enough? if cert_issuer not in self._trusted: raise InvalidCAError("Root in AIA but not in trusted list") + # is this enough? logger.debug(f"Found a self-signed (root) certificate for " f"{host} in AIA, and it's also in trusted list!") - yield self._trusted[cert_issuer] + yield (self._trusted[cert_issuer], False) return - yield der_cert - if not cert_dict["aia_ca_issuers"]: + yield (der_cert, der_cert_was_fetched) + + if cert_issuer in self._trusted: + # is this enough? + yield (self._trusted[cert_issuer], False) + return + + if not cert_dict["aia_ca_issuers"]: # FIXME wrong condition? if cert_issuer not in self._trusted: raise InvalidCAError("Root not in trusted database") logger.debug(f"Found the {host} certificate root!") - yield self._trusted[cert_issuer] + yield (self._trusted[cert_issuer], False) return logger.debug(f"Found another {host} certificate chain entry (AIA)") + print(f"Found another {host} certificate chain entry (AIA)") der_cert = self._get_ca_issuer_cert(cert_dict["aia_ca_issuers"][0]) + der_cert_was_fetched = True + ''' def validate_certificate_chain(self, certs): """ @@ -223,7 +404,24 @@ def validate_certificate_chain(self, certs): # https://cryptography.io/en/latest/x509/verification/ - with open(certifi.where(), "rb") as pems: + """ + # prefer os.environ.get("SSL_CERT_FILE") + # or "SYSTEM_CERTIFICATE_PATH" + # /nix/store/ad8lpasryg34dv93h3n359bri176pj56-nss-cacert-3.98/etc/ssl/certs/ca-bundle.crt + + #print("certifi.where()", certifi.where()) + # /nix/store/ad8lpasryg34dv93h3n359bri176pj56-nss-cacert-3.98/etc/ssl/certs/ca-bundle.crt + + ca_bundle_path = ( + os.environ.get("SSL_CERT_FILE") or + os.environ.get("SYSTEM_CERTIFICATE_PATH") or + certifi.where() + ) + """ + + ca_bundle_path = certifi.where() + + with open(ca_bundle_path, "rb") as pems: store = x509.verification.Store(x509.load_pem_x509_certificates(pems.read())) builder = x509.verification.PolicyBuilder().store(store) @@ -242,31 +440,67 @@ def validate_certificate_chain(self, certs): #except x509.UnsupportedGeneralNameType: # raise ssl.SSLError - def cadata_from_host(self, host): + def cadata_from_host(self, host, **kwargs): """ Get the certification chain, apart from the leaf node, as joined PEM (ASCII string in base64 with extra delimiters) certificates in a single string, to be used in a SSLContext. """ - cadata, host_regex = self.cadata_and_host_regex_from_host(host) + cadata, host_regex = self.cadata_and_host_regex_from_host(host, **kwargs) return cadata - def cadata_and_host_regex_from_host(self, host): + def cadata_and_host_regex_from_host(self, host, only_missing=False, timeout=5): """ Get the certification chain and the host regex. Note: The host regex only matches lowercase hostnames. The host regex also matches ports like example.com:12345. + Set only_missing to True to get only the missing certificates. See also cadata_from_host """ host = host.lower() + print("cadata_and_host_regex_from_host", host) + for host_regex in self._cadata_from_host_regex: + print("host_regex", host_regex) if host_regex.fullmatch(host): + print("cadata_and_host_regex_from_host read cache") # read cache cadata = self._cadata_from_host_regex[host_regex] return cadata, host_regex - der_certs = list(self.aia_chase(host)) + print("cadata_and_host_regex_from_host cache miss") + + cert_chain, chain_is_valid, missing_certs = self.aia_chase(host, timeout) + + assert chain_is_valid + + target_cert = cert_chain[0] + #target_cert = target_cert.to_cryptography() + #print("target_cert", repr(target_cert), dir(target_cert), target_cert.public_bytes(cryptography.hazmat.primitives.serialization.Encoding.PEM)) + + cadata = "\n".join(map(lambda c: c.public_bytes(Encoding.PEM).decode("ascii"), cert_chain)) + + ''' + target_name = get_cn_of_name(target_cert.subject).lower() # can be "*.example.com" + #target_name = target_cert.get_subject() + print("target_name", repr(target_name)) + + # host can have port. target_name has no port + # port is between 0 and 65535 inclusive + host_regex = target_name.replace(".", "\\.").replace("*", ".*") + "(?::[0-9]{1,5})?" + host_regex = re.compile(host_regex) + + #print("cert_chain", cert_chain) + + #der_cert_tuples = list(self.aia_chase(host)) + ''' + + ''' + der_certs = [t[0] for t in der_cert_tuples] + + der_cert_was_fetched_list = [t[1] for t in der_cert_tuples] + print("der_cert_was_fetched_list", der_cert_was_fetched_list) # TODO move up to aia_chase # load cert as soon as possible to avoid double-parsing @@ -275,10 +509,17 @@ def cadata_and_host_regex_from_host(self, host): logger.info(f"Checking the {host} certificate chain...") self.validate_certificate_chain(certs) logger.info(f"The {host} certificate chain is valid!") - cadata = "".join(ssl.DER_cert_to_PEM_cert(dc) for dc in der_certs[1:]) + + if only_missing: + cadata = "".join(ssl.DER_cert_to_PEM_cert(t[0]) for t in der_cert_tuples[1:] if t[1]) + else: + cadata = "".join(ssl.DER_cert_to_PEM_cert(dc) for dc in der_certs[1:]) target_cert = certs[0] + ''' + target_name = get_cn_of_name(target_cert.subject).lower() # can be "*.example.com" + # host can have port. target_name has no port # port is between 0 and 65535 inclusive host_regex = target_name.replace(".", "\\.").replace("*", ".*") + "(?::[0-9]{1,5})?" @@ -295,19 +536,19 @@ def cadata_and_host_regex_from_host(self, host): return cadata, host_regex - def cadata_from_url(self, url): + def cadata_from_url(self, url, **kwargs): """Façade to the ``cadata_from_host`` method.""" split_result = urlsplit(url) - return self.cadata_from_host(split_result.netloc) + return self.cadata_from_host(split_result.netloc, **kwargs) - def ssl_context_from_host(self, host, purpose=ssl.Purpose.SERVER_AUTH): + def ssl_context_from_host(self, host, purpose=ssl.Purpose.SERVER_AUTH, **kwargs): """ SSLContext instance for a single host name that gets (and validates) its certificate chain from AIA. """ return ssl.create_default_context( purpose=purpose, - cadata=self.cadata_from_host(host), + cadata=self.cadata_from_host(host, **kwargs), ) def ssl_context_from_url(self, url, purpose=ssl.Purpose.SERVER_AUTH): diff --git a/setup.py b/setup.py index ce72570..6c75946 100755 --- a/setup.py +++ b/setup.py @@ -34,6 +34,7 @@ include_package_data=True, python_requires=">=3.6", install_requires=[ + "pyopenssl", "cryptography", "certifi", ], @@ -43,7 +44,7 @@ "Intended Audience :: Developers", "Intended Audience :: System Administrators", "License :: OSI Approved :: BSD License", - "Operating System :: OS Independent", # Assuming OpenSSL is available + "Operating System :: OS Independent", "Programming Language :: Python :: 3", "Programming Language :: Python :: 3.6", "Programming Language :: Python :: 3.7",