Skip to content

Commit

Permalink
use cryptography module
Browse files Browse the repository at this point in the history
  • Loading branch information
milahu committed May 25, 2024
1 parent 195a906 commit c127c28
Show file tree
Hide file tree
Showing 3 changed files with 65 additions and 59 deletions.
6 changes: 2 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ of each certificate,
and gets the root certificate locally, from the system.

**How does it validate the certificate chain?**
Through OpenSSL, which must be installed as an external dependency.
Through OpenSSL, via the [cryptography](https://github.com/pyca/cryptography) module.

**When should I use it?**
Ideally, never, but that might not be an option.
Expand All @@ -31,8 +31,6 @@ or get the intermediary certificates in the chain through AIA

## How to install

Anywhere, assuming OpenSSL is already installed:

```bash
pip install aia
```
Expand Down Expand Up @@ -94,7 +92,7 @@ context = aia_session.ssl_context_from_url(url)
resp = httpx.get(url, verify=context)
```

The certificate fetching part of this library and the OpenSSL call
The certificate fetching and validating part of this library
are blocking, so this library is still not prepared
for asynchronous code.
But one can easily make some workaround to use it, for example with
Expand Down
114 changes: 59 additions & 55 deletions aia.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,15 @@
import re
import socket
import ssl
import subprocess
from tempfile import NamedTemporaryFile
from urllib.request import urlopen, Request
from urllib.parse import urlsplit

# https://cryptography.io/en/latest/x509/
from cryptography import x509

import certifi


__version__ = "0.2.0"

Expand Down Expand Up @@ -67,29 +71,38 @@ def __get__(self, instance, owner):
return result


def get_cn_of_name(name):
# https://cryptography.io/en/latest/x509/reference/#cryptography.x509.Name
for attr in name:
if attr.rfc4514_attribute_name == "CN":
return attr.value


def get_ca_issuers_of_cert(cert):
try:
aia_extension = cert.extensions.get_extension_for_class(x509.AuthorityInformationAccess)
except x509.extensions.ExtensionNotFound:
return []
# https://cryptography.io/en/latest/x509/reference/#cryptography.x509.AccessDescription
ca_issuers = []
for access_description in aia_extension.value:
if access_description.access_method._name == "caIssuers":
ca_issuers.append(access_description.access_location.value)
return ca_issuers


def openssl_get_cert_info(cert_der):
"""
Get issuer, subject and AIA CA issuers (``aia_ca_issuers``)
from a DER certificate, using OpenSSL.
from a DER certificate.
"""
command_line = [
"openssl", "x509", "-inform", "DER", "-noout",
"-issuer", "-subject", "-ext", "authorityInfoAccess",
"-nameopt", "utf8,sep_comma_plus",
]
proc = subprocess.run(command_line, input=cert_der, capture_output=True)
output_pairs = re.findall(
r"^(issuer=|subject=|\s+CA\s*Issuers.*URI:)(.*)$",
proc.stdout.decode("utf-8"),
re.MULTILINE | re.IGNORECASE,
cert = x509.load_der_x509_certificate(cert_der)
cert_info = dict(
issuer = get_cn_of_name(cert.issuer),
subject = get_cn_of_name(cert.subject),
aia_ca_issuers = get_ca_issuers_of_cert(cert),
)
result = {"aia_ca_issuers": []}
for k, v in output_pairs:
if k.startswith(" "):
result["aia_ca_issuers"].append(v.strip())
else:
result[k.lower()[:-1]] = v.strip()
return result
return cert_info


class AIASession:
Expand Down Expand Up @@ -176,43 +189,34 @@ def validate_certificate_chain(self, der_certs):
as a list of DER (binary) certificates from leaf to root
(in this order and including both),
raising an ``ssl.SSLError`` when the chain isn't valid.
This method requires OpenSSL,
which should be available from the command line.
"""
with ExitStack() as stack:
def new_pem_file(data):
pf = stack.enter_context(
NamedTemporaryFile("wb", suffix=".pem"),
)
pf.write(data.encode("ascii"))
pf.flush()
return pf

pem_certs = [ssl.DER_cert_to_PEM_cert(dc) for dc in der_certs]
target_pem = new_pem_file(pem_certs[0])
intermediary_pem = new_pem_file("".join(pem_certs[1:-1]))
root_pem = new_pem_file(pem_certs[-1])

command_line = [
"openssl", "verify",
"-CAfile", root_pem.name,
"-untrusted", intermediary_pem.name,
target_pem.name,
]
openssl_proc = subprocess.run(command_line, capture_output=True)

# Logs the OpenSSL results
logger.debug("OpenSSL certificate chain validation results:")
for stream_name in ["stdout", "stderr"]:
msg = getattr(openssl_proc, stream_name)
if msg.strip():
for line in msg.decode("ascii").splitlines():
logger.debug(f"[{stream_name}] {line}")
logger.debug(f"[return code] {openssl_proc.returncode}")

if openssl_proc.returncode != 0:
raise ssl.SSLError("Certificate chain verification failed")

certs = list(map(x509.load_der_x509_certificate, der_certs))

target_cert = certs[0]
intermediary_cert_list = certs[1:-1]
root_cert = certs[-1]

# https://cryptography.io/en/latest/x509/verification/

with open(certifi.where(), "rb") as pems:
store = x509.verification.Store(x509.load_pem_x509_certificates(pems.read()))

builder = x509.verification.PolicyBuilder().store(store)

target_name = get_cn_of_name(target_cert.subject) # can be "*.example.com"
target_name = target_name.replace("*", "x") # fix: ValueError: invalid domain name
target_subject = x509.DNSName(target_name)

verifier = builder.build_server_verifier(target_subject)

try:
verifier.verify(target_cert, intermediary_cert_list)
except x509.verification.VerificationError:
raise ssl.SSLError
# this should be unreachable since target_name.replace
#except x509.UnsupportedGeneralNameType:
# raise ssl.SSLError

@CachedMethod
def cadata_from_host(self, host):
Expand Down
4 changes: 4 additions & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,10 @@
py_modules=["aia"],
include_package_data=True,
python_requires=">=3.6",
install_requires=[
"cryptography",
"certifi",
],
classifiers=[
"Development Status :: 2 - Pre-Alpha",
"Environment :: Web Environment",
Expand Down

0 comments on commit c127c28

Please sign in to comment.