From 340aa86d33122392dd1d28ab634584230ebf240f Mon Sep 17 00:00:00 2001 From: Milan Hauth Date: Sat, 25 May 2024 18:45:02 +0200 Subject: [PATCH] add option cert_cache --- README.md | 6 ++++++ aia.py | 27 ++++++++++++++++++++++++--- 2 files changed, 30 insertions(+), 3 deletions(-) diff --git a/README.md b/README.md index 6197062..d289d42 100644 --- a/README.md +++ b/README.md @@ -62,6 +62,12 @@ context = aia_session.ssl_context_from_url(url) response = urlopen(url, context=context) ``` +To cache the downloaded intermediary certificates on disk: + +```py +aia_session = AIASession(cert_cache="/tmp/aia-certs") +``` + The context methods also helps when working with HTTP client libraries. For example, with [`requests`](http://python-requests.org/): diff --git a/aia.py b/aia.py index 9958d05..e504743 100644 --- a/aia.py +++ b/aia.py @@ -1,3 +1,4 @@ +import os from contextlib import ExitStack from functools import lru_cache, partial import logging @@ -107,8 +108,14 @@ def openssl_get_cert_info(cert_der): class AIASession: - def __init__(self, user_agent=DEFAULT_USER_AGENT): + def __init__(self, user_agent=DEFAULT_USER_AGENT, cert_cache=None): + """ + Create a new session. + cert_cache is a directory path, + where downloaded intermediary certificates are stored. + """ self.user_agent = user_agent + self.cert_cache = cert_cache self._context = ssl.SSLContext() # TLS (don't check broken chain) self._context.load_default_certs() @@ -143,14 +150,28 @@ def _get_ca_issuer_cert(self, url): as the CA Issuer URI in the AIA extension of the previous "node" (certificate) of the chain. """ - if urlsplit(url).scheme != "http": + url_parsed = urlsplit(url) + if url_parsed.scheme != "http": raise AIASchemeError("Invalid CA issuer certificate URI protocol") + if self.cert_cache: + cache_path = self.cert_cache + "/" + url_parsed.netloc + url_parsed.path + else: + cache_path = None + if cache_path and os.path.exists(cache_path): + with open(cache_path, "rb") as f: + return f.read() # read cache logger.debug(f"Downloading CA issuer certificate at {url}") req = Request(url=url, headers={"User-Agent": self.user_agent}) with urlopen(req) as resp: if resp.status != 200: raise AIADownloadError(f"HTTP {resp.status} (CA Issuer Cert.)") - return resp.read() + data = resp.read() + # check again if cache_path exists. can have multiple writers + if cache_path and not os.path.exists(cache_path): + os.makedirs(os.path.dirname(cache_path), exist_ok=True) + with open(cache_path, "wb") as f: + f.write(data) # write cache + return data def aia_chase(self, host): """