Skip to content

Commit

Permalink
add option cert_cache
Browse files Browse the repository at this point in the history
  • Loading branch information
milahu committed May 25, 2024
1 parent 3c284e0 commit 340aa86
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 3 deletions.
6 changes: 6 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,12 @@ context = aia_session.ssl_context_from_url(url)
response = urlopen(url, context=context)
```

To cache the downloaded intermediary certificates on disk:

```py
aia_session = AIASession(cert_cache="/tmp/aia-certs")
```

The context methods also helps when working with HTTP client libraries.
For example, with [`requests`](http://python-requests.org/):

Expand Down
27 changes: 24 additions & 3 deletions aia.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import os
from contextlib import ExitStack
from functools import lru_cache, partial
import logging
Expand Down Expand Up @@ -107,8 +108,14 @@ def openssl_get_cert_info(cert_der):

class AIASession:

def __init__(self, user_agent=DEFAULT_USER_AGENT):
def __init__(self, user_agent=DEFAULT_USER_AGENT, cert_cache=None):
"""
Create a new session.
cert_cache is a directory path,
where downloaded intermediary certificates are stored.
"""
self.user_agent = user_agent
self.cert_cache = cert_cache
self._context = ssl.SSLContext() # TLS (don't check broken chain)
self._context.load_default_certs()

Expand Down Expand Up @@ -143,14 +150,28 @@ def _get_ca_issuer_cert(self, url):
as the CA Issuer URI in the AIA extension
of the previous "node" (certificate) of the chain.
"""
if urlsplit(url).scheme != "http":
url_parsed = urlsplit(url)
if url_parsed.scheme != "http":
raise AIASchemeError("Invalid CA issuer certificate URI protocol")
if self.cert_cache:
cache_path = self.cert_cache + "/" + url_parsed.netloc + url_parsed.path
else:
cache_path = None
if cache_path and os.path.exists(cache_path):
with open(cache_path, "rb") as f:
return f.read() # read cache
logger.debug(f"Downloading CA issuer certificate at {url}")
req = Request(url=url, headers={"User-Agent": self.user_agent})
with urlopen(req) as resp:
if resp.status != 200:
raise AIADownloadError(f"HTTP {resp.status} (CA Issuer Cert.)")
return resp.read()
data = resp.read()
# check again if cache_path exists. can have multiple writers
if cache_path and not os.path.exists(cache_path):
os.makedirs(os.path.dirname(cache_path), exist_ok=True)
with open(cache_path, "wb") as f:
f.write(data) # write cache
return data

def aia_chase(self, host):
"""
Expand Down

0 comments on commit 340aa86

Please sign in to comment.