-
Notifications
You must be signed in to change notification settings - Fork 2
/
app.py
225 lines (187 loc) · 8.77 KB
/
app.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
import hashlib
import json
import os
from base64 import b64encode
import re
import cbor2
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives.asymmetric import ec
from cryptography.hazmat.primitives import hashes
from flask import Flask, render_template, request, redirect, session
from flask_oidc import OpenIDConnect
from keycloak import KeycloakAdmin
from webauthn import (
generate_registration_options, generate_authentication_options, options_to_json,
verify_registration_response, verify_authentication_response, base64url_to_bytes, bytes_to_base64url
)
from webauthn.helpers.cose import COSEAlgorithmIdentifier
from webauthn.helpers.structs import (
AuthenticationExtensionsLargeBlobInputs, LargeBlobSupport, AuthenticatorSelectionCriteria, ResidentKeyRequirement,
PublicKeyCredentialDescriptor, RegistrationCredential, AuthenticationCredential, AttestationConveyancePreference,
UserVerificationRequirement, CredentialProtectionPolicy
)
HOST_URL = os.environ["WAU_HOST_URL"]
RP_ID = re.search(r'https?://([^:]+)', HOST_URL).group(1)
app = Flask(__name__)
app.config["OIDC_CLIENT_SECRETS"] = "client_secrets.json"
app.config["OIDC_SCOPES"] = ["openid", "profile", "email"]
app.config["SECRET_KEY"] = "adfsdfsdfsdfsdf"
app.config["OVERWRITE_REDIRECT_URI"] = f"{HOST_URL}/oidc_callback"
oidc = OpenIDConnect(app)
signing_key = None
if "WAU_SIGNING_KEY_PATH" in os.environ:
with open(os.environ["WAU_SIGNING_KEY_PATH"], "rb") as f:
signing_key = serialization.load_pem_private_key(f.read(), None)
keycloak_admin = KeycloakAdmin(server_url=f"https://{os.environ['WAU_KEYCLOAK_HOST_NAME']}/auth/",
client_id=os.environ['WAU_KEYCLOAK_CLIENT_ID'],
client_secret_key=os.environ['WAU_KEYCLOAK_CLIENT_SECRET'],
realm_name="hotsir",
verify=True,
auto_refresh_token=['get', 'put', 'post', 'delete'])
def parse_credential_data(credential):
credential["credentialData"] = json.loads(credential["credentialData"])
return credential
def get_credentials_for_user(user_id):
credentials = json.loads(keycloak_admin.raw_get(f"admin/realms/hotsir/users/{user_id}/credentials").content)
return list(map(parse_credential_data, filter(lambda credential: credential['type'] == 'webauthn', credentials)))
def get_signed_access_rights():
access_rights = str(oidc.user_getfield("access_rights")).encode('utf-8')
public_key = base64url_to_bytes(session["selected_credential_publicKey"])
signature = signing_key.sign(access_rights + public_key, ec.ECDSA(hashes.SHA256())) if signing_key is not None else bytes()
return cbor2.dumps([access_rights, public_key, signature])
@app.route('/')
@oidc.require_login
def index():
return render_template(
'index.html',
username=oidc.user_getfield("preferred_username"),
given_name=oidc.user_getfield("given_name"),
family_name=oidc.user_getfield("family_name"),
)
@app.route('/logout')
def logout():
oidc.logout()
return redirect('/')
@app.route('/register')
@oidc.require_login
def register():
registration_options = generate_registration_options(
rp_id=RP_ID,
rp_name="Webauthn Updater.",
user_id=oidc.user_getfield('sub'),
user_name=oidc.user_getfield('preferred_username'),
user_display_name=oidc.user_getfield('name'),
large_blob_extension=AuthenticationExtensionsLargeBlobInputs(
support=LargeBlobSupport.REQUIRED,
),
credential_protection_policy=CredentialProtectionPolicy.USER_VERIFICATION_OPTIONAL,
authenticator_selection=AuthenticatorSelectionCriteria(
resident_key=ResidentKeyRequirement.REQUIRED,
user_verification=UserVerificationRequirement.DISCOURAGED
),
attestation=AttestationConveyancePreference.DIRECT,
# This is necessary as the microcontrollers verifying the signatures
# do not implement all algorithms.
supported_pub_key_algs=[COSEAlgorithmIdentifier.EDDSA],
)
session["last_challenge"] = registration_options.challenge
return options_to_json(registration_options)
@app.route('/register-response', methods=['POST'])
@oidc.require_login
def register_response():
credential = RegistrationCredential.parse_raw(request.get_data())
verified_registration = verify_registration_response(
credential=credential,
expected_challenge=session["last_challenge"],
expected_rp_id=RP_ID,
expected_origin=HOST_URL
)
user_id = oidc.user_getfield('sub')
credential = dict(
type="webauthn",
secretData="{}",
userLabel="webauthn-updater",
credentialData=json.dumps(dict(
credentialId=bytes_to_base64url(verified_registration.credential_id),
credentialPublicKey=bytes_to_base64url(verified_registration.credential_public_key),
aaguid=verified_registration.aaguid,
counter=verified_registration.sign_count,
attestationStatementFormat=verified_registration.fmt,
attestationStatement=bytes_to_base64url(verified_registration.attestation_object),
))
)
keycloak_admin.update_user(user_id=user_id, payload=dict(credentials=[credential]))
return '', 204
@app.route('/identify-credential')
@oidc.require_login
def identify_credential():
stored_credentials = get_credentials_for_user(oidc.user_getfield('sub'))
authentication_options = generate_authentication_options(
rp_id=RP_ID,
# We cannot rely completely on the discoverable credential feature here, as that could make the authenticator
# select a credential from another user.
allow_credentials=[
PublicKeyCredentialDescriptor(id=base64url_to_bytes(credential["credentialData"]["credentialId"]))
for credential in stored_credentials
]
)
session["last_challenge"] = authentication_options.challenge
return options_to_json(authentication_options)
@app.route('/authentication-response', methods=['POST'])
@oidc.require_login
def authentication_response():
stored_credentials = get_credentials_for_user(oidc.user_getfield('sub'))
if len(stored_credentials) == 0:
return 'User has not registered a credential', 400
credential = AuthenticationCredential.parse_raw(request.get_data())
stored_credential = None
for cred in stored_credentials:
if cred["credentialData"]["credentialId"] == credential.id:
stored_credential = cred
if stored_credential is None:
return 'Credential with this ID is not registered', 400
verified_authentication = verify_authentication_response(
credential=credential,
expected_challenge=session["last_challenge"],
expected_rp_id=RP_ID,
expected_origin=HOST_URL,
credential_public_key=base64url_to_bytes(stored_credential["credentialData"]["credentialPublicKey"]),
credential_current_sign_count=0
)
session["selected_credential_id"] = verified_authentication.credential_id
session["selected_credential_publicKey"] = stored_credential["credentialData"]["credentialPublicKey"]
return b64encode(verified_authentication.credential_id)
@app.route('/write-blob')
@oidc.require_login
def write_blob():
credentials = get_credentials_for_user(oidc.user_getfield('sub'))
if len(credentials) == 0:
return 'User has not registered a credential', 400
if (selected_credential_id := session["selected_credential_id"]) is None or session["selected_credential_publicKey"] is None:
return "User has not selected a credential to write to", 400
authentication_options = generate_authentication_options(
rp_id=RP_ID,
allow_credentials=[PublicKeyCredentialDescriptor(id=selected_credential_id)],
large_blob_extension=AuthenticationExtensionsLargeBlobInputs(
write=get_signed_access_rights()
)
)
return options_to_json(authentication_options)
@app.route('/read-blob')
@oidc.require_login
def read_blob():
credentials = get_credentials_for_user(oidc.user_getfield('sub'))
if len(credentials) == 0:
return 'User has not registered a credential', 400
if (selected_credential_id := session["selected_credential_id"]) is None:
return "User has not selected a credential to write to", 400
authentication_options = generate_authentication_options(
rp_id=RP_ID,
allow_credentials=[PublicKeyCredentialDescriptor(id=selected_credential_id)],
large_blob_extension=AuthenticationExtensionsLargeBlobInputs(
read=True
)
)
return options_to_json(authentication_options)
if __name__ == '__main__':
app.run(port=os.getenv("WAU_SERVER_PORT", 8002), host="localhost")