From 90bffd6fb564fd74294fb7b9923d4a5b9e7cdc62 Mon Sep 17 00:00:00 2001 From: "Jason C. Leach" Date: Fri, 2 Feb 2024 18:20:54 -0800 Subject: [PATCH] fix: add better logging (#26) Signed-off-by: Jason C. Leach --- devops/charts/controller/values.yaml | 2 +- fixtures/test_traction_proof_request.json | 18 +-- src/apple.py | 160 ++++++++++++++-------- src/goog.py | 53 ++++--- 4 files changed, 145 insertions(+), 88 deletions(-) diff --git a/devops/charts/controller/values.yaml b/devops/charts/controller/values.yaml index c0ac44e..cbb81ef 100644 --- a/devops/charts/controller/values.yaml +++ b/devops/charts/controller/values.yaml @@ -14,7 +14,7 @@ image: registry: ghcr.io repository: bcgov/mobile-attestation-vc-controller/controller # Overrides the image tag whose default is the chart appVersion. - tag: "5858a94" + tag: "fdbeea6" env: TRACTION_BASE_URL: "https://traction-tenant-proxy-dev.apps.silver.devops.gov.bc.ca" diff --git a/fixtures/test_traction_proof_request.json b/fixtures/test_traction_proof_request.json index d551235..1fec756 100644 --- a/fixtures/test_traction_proof_request.json +++ b/fixtures/test_traction_proof_request.json @@ -5,19 +5,21 @@ "requested_attributes": { "attestationInfo": { "names": [ - "Assurance Level", - "Issued At" + "operating_system_version", + "validation_method", + "app_id", + "app_vendor", + "issue_date_dateint", + "operating_system", + "app_version" ], "restrictions": [ { - "schema_id": "J6LCm5Edi9Mi3ASZCqNC1A:2:dev-attestation-schema:1.0", - "issuer_did": "J6LCm5Edi9Mi3ASZCqNC1A" + "schema_id": "NXp6XcGeCR2MviWuY51Dva:2:app_attestation:1.0", + "issuer_did": "NXp6XcGeCR2MviWuY51Dva" }, { - "cred_def_id": "J6LCm5Edi9Mi3ASZCqNC1A:3:CL:109799:dev-attestation" - }, - { - "cred_def_id": "NxWbeuw8Y2ZBiTrGpcK7Tn:3:CL:48312:default" + "cred_def_id": "NXp6XcGeCR2MviWuY51Dva:3:CL:33557:bcwallet" } ] } diff --git a/src/apple.py b/src/apple.py index a8a84ed..5924e0f 100644 --- a/src/apple.py +++ b/src/apple.py @@ -11,15 +11,28 @@ import hashlib import requests import os +import logging from dotenv import load_dotenv -from constants import app_id, rp_id_hash_end, counter_start, counter_end, aaguid_start, aaguid_end, cred_id_start - +from constants import ( + app_id, + rp_id_hash_end, + counter_start, + counter_end, + aaguid_start, + aaguid_end, + cred_id_start, +) from cryptography.exceptions import InvalidSignature -load_dotenv() +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) + +if os.getenv("FLASK_ENV") == "development": + load_dotenv() AppleAppAttestStatement = Dict[str, Union[str, Dict[str, List[bytes]], bytes]] + def fetch_apple_attestation_root_ca_cert(): url = os.getenv("APPLE_ATTESTATION_ROOT_CA_URL") response = requests.get(url) @@ -28,22 +41,25 @@ def fetch_apple_attestation_root_ca_cert(): return cert -def decode_apple_attestation_object(object_as_base64: str) -> Union[AppleAppAttestStatement, None]: + +def decode_apple_attestation_object( + object_as_base64: str, +) -> Union[AppleAppAttestStatement, None]: try: binary_data = base64.b64decode(object_as_base64) return cbor.loads(binary_data) except Exception as e: # Throws on invalid input - print(e) + logger.info(e) return None def create_authdata_with_nonce_hash(attestation_object, nonce): - hash = hashlib.sha256(nonce.encode('utf-8')).digest() + hash = hashlib.sha256(nonce.encode("utf-8")).digest() client_data_hash = hash - concatenated_buffer = attestation_object['authData'] + client_data_hash + concatenated_buffer = attestation_object["authData"] + client_data_hash return concatenated_buffer @@ -57,17 +73,23 @@ def create_composite_nonce(concatenated_buffer): def verify_x5c_certificates(attestation_object): try: root_certificate = fetch_apple_attestation_root_ca_cert() - credential_certificate = x509.load_der_x509_certificate(attestation_object['attStmt']['x5c'][0], default_backend()) - intermediate_certificate = x509.load_der_x509_certificate(attestation_object['attStmt']['x5c'][1], default_backend()) + credential_certificate = x509.load_der_x509_certificate( + attestation_object["attStmt"]["x5c"][0], default_backend() + ) + intermediate_certificate = x509.load_der_x509_certificate( + attestation_object["attStmt"]["x5c"][1], default_backend() + ) - print('root_certificate', root_certificate.subject) - print('credential_certificate', credential_certificate.subject) - print('intermediate_certificate', intermediate_certificate.subject) + logger.info("root_certificate", root_certificate.subject) + logger.info("credential_certificate", credential_certificate.subject) + logger.info("intermediate_certificate", intermediate_certificate.subject) if intermediate_certificate.issuer == root_certificate.subject: - print('The child certificate was issued by the parent certificate.') + logger.info("The child certificate was issued by the parent certificate.") else: - print('The child certificate was not issued by the parent certificate.') + logger.info( + "The child certificate was not issued by the parent certificate." + ) # Verify the signature of the certificate using the public key of the root certificate @@ -81,7 +103,7 @@ def verify_x5c_certificates(attestation_object): intermediate_certificate_is_valid = root_certificate.public_key().verify( intermediate_certificate.signature, intermediate_certificate.tbs_certificate_bytes, - ec.ECDSA(intermediate_certificate.signature_hash_algorithm) + ec.ECDSA(intermediate_certificate.signature_hash_algorithm), ) credential_certificate_is_valid = intermediate_certificate.public_key().verify( @@ -90,40 +112,49 @@ def verify_x5c_certificates(attestation_object): ec.ECDSA(credential_certificate.signature_hash_algorithm), ) - if intermediate_certificate_is_valid is None and credential_certificate_is_valid is None: - print('The certificates are signed by the ROOT certificate.') + if ( + intermediate_certificate_is_valid is None + and credential_certificate_is_valid is None + ): + logger.info("The certificates are signed by the ROOT certificate.") return True except InvalidSignature as e: - print("The certificates are NOT signed by the ROOT certificate.") - print(e) + logger.info("The certificates are NOT signed by the ROOT certificate.") + logger.info(e) return False -def extract_attestation_object_extension(attestation_object, oid='1.2.840.113635.100.8.2'): +def extract_attestation_object_extension( + attestation_object, oid="1.2.840.113635.100.8.2" +): # Load the certificate from a file - credential_certificate = x509.load_der_x509_certificate(attestation_object['attStmt']['x5c'][0]) + credential_certificate = x509.load_der_x509_certificate( + attestation_object["attStmt"]["x5c"][0] + ) # Get the extension with OID 1.2.840.113635.100.8.2 - cred_cert_extension = credential_certificate.extensions.get_extension_for_oid(x509.ObjectIdentifier(oid)) + cred_cert_extension = credential_certificate.extensions.get_extension_for_oid( + x509.ObjectIdentifier(oid) + ) # Get the value of the extension cred_cert_extension_value = cred_cert_extension.value.value - decoded_data, _ = decoder.decode(cred_cert_extension_value, asn1Spec=univ.Sequence()) + decoded_data, _ = decoder.decode( + cred_cert_extension_value, asn1Spec=univ.Sequence() + ) return decoded_data[0].asOctets().hex() def is_valid_pem(pem): try: - serialization.load_pem_public_key( - pem, - backend=default_backend() - ) + serialization.load_pem_public_key(pem, backend=default_backend()) return True except ValueError: return False - + + def create_hash_from_pub_key(cred_certificate): certificate = x509.load_der_x509_certificate(cred_certificate, default_backend()) @@ -139,9 +170,11 @@ def create_hash_from_pub_key(cred_certificate): # Retrieve the X and Y coordinates of the public key x = public_key.public_numbers().x y = public_key.public_numbers().y - + # Convert the coordinates to byte strings and prepend with b'\x04' - public_key_bytes = b'\x04' + x.to_bytes(32, byteorder='big') + y.to_bytes(32, byteorder='big') + public_key_bytes = ( + b"\x04" + x.to_bytes(32, byteorder="big") + y.to_bytes(32, byteorder="big") + ) # Create a SHA256 hash of the public key bytes digest = hashes.Hash(hashes.SHA256()) @@ -153,17 +186,21 @@ def create_hash_from_pub_key(cred_certificate): return public_key_sha256_hex + def create_app_id_hash(): - app_id_bytes = app_id.encode('utf-8') + app_id_bytes = app_id.encode("utf-8") app_id_hash = hashlib.sha256(app_id_bytes).hexdigest() return app_id_hash + def verify_attestation_statement(attestation_object, nonce): try: # decode the attestation object is expecting attestation_object # to be JSON. - print('Decoding attestation object...') - apple_attestation_object = decode_apple_attestation_object(attestation_object['attestation_object']) + logger.info("Decoding attestation object...") + apple_attestation_object = decode_apple_attestation_object( + attestation_object["attestation_object"] + ) if not apple_attestation_object: return False @@ -171,7 +208,7 @@ def verify_attestation_statement(attestation_object, nonce): # certificates for App Attest, starting from the credential certificate in the first # data buffer in the array (credcert). Verify the validity of the certificates using # Apple’s App Attest root certificate. - print('Apple Attestation step 1...') + logger.info("Apple Attestation step 1...") verify_x5c_status = verify_x5c_certificates(apple_attestation_object) if not verify_x5c_status: return False @@ -179,70 +216,77 @@ def verify_attestation_statement(attestation_object, nonce): # 2. Create clientDataHash as the SHA256 hash of the one-time challenge your server sends # to your app before performing the attestation, and append that hash to the end of the # authenticator data (authData from the decoded object). - print('Apple Attestation step 2...') - authdata_with_nonce_hash = create_authdata_with_nonce_hash(apple_attestation_object, nonce) + logger.info("Apple Attestation step 2...") + authdata_with_nonce_hash = create_authdata_with_nonce_hash( + apple_attestation_object, nonce + ) # 3. Generate a new SHA256 hash of the composite item to create nonce. - print('Apple Attestation step 3...') + logger.info("Apple Attestation step 3...") composite_nonce = create_composite_nonce(authdata_with_nonce_hash) # 4. Obtain the value of the credCert extension with OID 1.2.840.113635.100.8.2, # which is a DER-encoded ASN.1 sequence. Decode the sequence and extract the single # octet string that it contains. Verify that the string equals nonce. - print('Apple Attestation step 4...') + logger.info("Apple Attestation step 4...") extension_value = extract_attestation_object_extension(apple_attestation_object) - if (extension_value != composite_nonce): + if extension_value != composite_nonce: return False # 5. Create the SHA256 hash of the public key in credCert, and verify that it matches the # key identifier from your app. - print('Apple Attestation step 5...') - pub_key_hash = create_hash_from_pub_key(apple_attestation_object['attStmt']['x5c'][0]) - key_id_b64 = base64.b64decode(attestation_object['key_id']) - if (key_id_b64.hex() != pub_key_hash): + logger.info("Apple Attestation step 5...") + pub_key_hash = create_hash_from_pub_key( + apple_attestation_object["attStmt"]["x5c"][0] + ) + key_id_b64 = base64.b64decode(attestation_object["key_id"]) + if key_id_b64.hex() != pub_key_hash: return False # 6. Compute the SHA256 hash of your app’s App ID, and verify that it’s the same as the # authenticator data’s RP ID hash. - print('Apple Attestation step 6...') + logger.info("Apple Attestation step 6...") app_id_hash = create_app_id_hash() - rp_id_hash = apple_attestation_object['authData'][:rp_id_hash_end].hex() - if (rp_id_hash != app_id_hash): + rp_id_hash = apple_attestation_object["authData"][:rp_id_hash_end].hex() + if rp_id_hash != app_id_hash: return False # 7. Verify that the authenticator data’s counter field equals 0. See # https://www.w3.org/TR/webauthn/#sctn-attestation for byte start and end points. - print('Apple Attestation step 7...') - counter = apple_attestation_object['authData'][counter_start:counter_end] - if (counter != bytearray(b'\x00\x00\x00\x00')): + logger.info("Apple Attestation step 7...") + counter = apple_attestation_object["authData"][counter_start:counter_end] + if counter != bytearray(b"\x00\x00\x00\x00"): return False # 8. Verify that the authenticator data’s aaguid field is either appattestdevelop if # operating in the development environment, or appattest followed by seven 0x00 # bytes if operating in the production environment. - print('Apple Attestation step 8...') - aaguid = apple_attestation_object['authData'][aaguid_start:aaguid_end] + logger.info("Apple Attestation step 8...") + aaguid = apple_attestation_object["authData"][aaguid_start:aaguid_end] # this step is failing so commenting out for now - if (aaguid != bytearray(b'appattestdevelop') and aaguid != bytearray(b'appattest\x00\x00\x00\x00\x00\x00\x00')): + if aaguid != bytearray(b"appattestdevelop") and aaguid != bytearray( + b"appattest\x00\x00\x00\x00\x00\x00\x00" + ): return False # 9. Verify that the authenticator data’s credentialId field is the same as the # key identifier. - print('Apple Attestation step 9...') - key_identifier = base64.b64decode(attestation_object['key_id']) + logger.info("Apple Attestation step 9...") + key_identifier = base64.b64decode(attestation_object["key_id"]) cred_id_length = len(key_identifier) cred_id_end = cred_id_start + cred_id_length - credential_id = apple_attestation_object['authData'][cred_id_start:cred_id_end] - if (credential_id != key_identifier): + credential_id = apple_attestation_object["authData"][cred_id_start:cred_id_end] + if credential_id != key_identifier: return False - print('Successful apple attestation') + logger.info("Successful apple attestation") return True except Exception as e: - print('Error during Apple attestation:', e) + logger.info("Error during Apple attestation:", e) return False + def main(): pass diff --git a/src/goog.py b/src/goog.py index b722980..fb18bcd 100644 --- a/src/goog.py +++ b/src/goog.py @@ -1,53 +1,64 @@ +import os +import logging from googleapiclient.discovery import build from google.oauth2 import service_account -import os from dotenv import load_dotenv -load_dotenv() +if os.getenv("FLASK_ENV") == "development": + load_dotenv() + +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) path = os.getenv("GOOGLE_AUTH_JSON_PATH") creds = service_account.Credentials.from_service_account_file( - path, scopes=['https://www.googleapis.com/auth/playintegrity'] + path, scopes=["https://www.googleapis.com/auth/playintegrity"] ) + # should eventually confirm nonce matches here def isValidVerdict(verdict, nonce): try: - valid_device_verdicts = ['MEETS_DEVICE_INTEGRITY'] - verdict_nonce = verdict['tokenPayloadExternal']['requestDetails']['nonce'] - request_package_name = verdict['tokenPayloadExternal']['requestDetails']['requestPackageName'] - package_name = verdict['tokenPayloadExternal']['appIntegrity']['packageName'] + valid_device_verdicts = ["MEETS_DEVICE_INTEGRITY"] + verdict_nonce = verdict["tokenPayloadExternal"]["requestDetails"]["nonce"] + request_package_name = verdict["tokenPayloadExternal"]["requestDetails"][ + "requestPackageName" + ] + package_name = verdict["tokenPayloadExternal"]["appIntegrity"]["packageName"] # app_verdict = verdict['tokenPayloadExternal']['appIntegrity']['appRecognitionVerdict'] - device_verdicts = verdict['tokenPayloadExternal']['deviceIntegrity']['deviceRecognitionVerdict'] + device_verdicts = verdict["tokenPayloadExternal"]["deviceIntegrity"][ + "deviceRecognitionVerdict" + ] if ( - verdict_nonce == nonce and - request_package_name == 'ca.bc.gov.BCWallet' and - package_name == 'ca.bc.gov.BCWallet' and - # app_verdict == 'PLAY_RECOGNIZED' and - set(valid_device_verdicts).issubset(device_verdicts) + verdict_nonce == nonce + and request_package_name == "ca.bc.gov.BCWallet" + and package_name == "ca.bc.gov.BCWallet" + and + # app_verdict == 'PLAY_RECOGNIZED' and + set(valid_device_verdicts).issubset(device_verdicts) ): return True else: return False except Exception as e: - print('Error evaluating verdict:', e) + logger.info("Error evaluating verdict:", e) return False # decrypt the integrity token on google's servers def verify_integrity_token(token, nonce): try: - service = build('playintegrity', 'v1', credentials=creds) - body = { - "integrityToken": token - } + service = build("playintegrity", "v1", credentials=creds) + body = {"integrityToken": token} instance = service.v1() - verdict = instance.decodeIntegrityToken(packageName='ca.bc.gov.BCWallet', body=body).execute() - if (isValidVerdict(verdict, nonce)): + verdict = instance.decodeIntegrityToken( + packageName="ca.bc.gov.BCWallet", body=body + ).execute() + if isValidVerdict(verdict, nonce): return True else: return False except Exception as e: - print('Error verifying integrity token:', e) + logger.info("Error verifying integrity token:", e) return False