-
Notifications
You must be signed in to change notification settings - Fork 10
/
Copy pathflaskserver.py
128 lines (101 loc) · 4.18 KB
/
flaskserver.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
import os
import json
import base64
import time
from flask import Flask, request, jsonify
from .server import PINServerECDH, PINServerECDHv1, PINServerECDHv2
from .pindb import PINDb
from wallycore import AES_KEY_LEN_256, AES_BLOCK_LEN, HMAC_SHA256_LEN
from dotenv import load_dotenv
# Time we will retain active sessions, in seconds.
# ie. maximum time allowed 'start_handshake' (which creates the session)
# and the get-/set-pin call, which utilises it.
# Can be set in environment, defaults to 5mins
load_dotenv()
SESSION_LIFETIME = int(os.environ.get('SESSION_LIFETIME', 300))
def flask_server():
# Load, verify, and cache server static key at startup
# (Refuse to start if key non-existing or invalid)
PINServerECDH.load_private_key()
sessions = {}
app = Flask(__name__)
def _cleanup_expired_sessions():
nonlocal sessions
time_now = int(time.time())
sessions = dict(filter(
lambda s: time_now - s[1].time_started < SESSION_LIFETIME,
sessions.items()))
@app.route('/', methods=['GET'])
def alive():
return ""
@app.route('/start_handshake', methods=['POST'])
def start_handshake_route():
app.logger.debug('Number of sessions {}'.format(len(sessions)))
# Create a new ephemeral server/session and get its signed pubkey
e_ecdh_server = PINServerECDHv1()
pubkey, sig = e_ecdh_server.get_signed_public_key()
ske = pubkey.hex()
# Cache new session
_cleanup_expired_sessions()
sessions[ske] = e_ecdh_server
# Return response
return jsonify({'ske': ske,
'sig': sig.hex()})
# NOTE: explicit fields in protocol v1
def _complete_server_call_v1(pin_func, udata):
ske = udata['ske']
assert 'replay_counter' not in udata
# Get associated session (ensuring not stale)
_cleanup_expired_sessions()
e_ecdh_server = sessions[ske]
# get/set pin and get response data
encrypted_key, hmac = e_ecdh_server.call_with_payload(
bytes.fromhex(udata['cke']),
bytes.fromhex(udata['encrypted_data']),
bytes.fromhex(udata['hmac_encrypted_data']),
pin_func)
# Expecting to return an encrypted aes-key with separate hmac
assert len(encrypted_key) == AES_KEY_LEN_256 + (2*AES_BLOCK_LEN)
assert len(hmac) == HMAC_SHA256_LEN
# Cleanup session
del sessions[ske]
_cleanup_expired_sessions()
# Return response
return jsonify({'encrypted_key': encrypted_key.hex(),
'hmac': hmac.hex()})
# NOTE: v2 is one concatentated field, base64-encoded
def _complete_server_call_v2(pin_func, udata):
assert 'data' in udata
data = base64.b64decode(udata['data'].encode())
assert len(data) > 37 # cke and counter and some encrypted payload
cke = data[:33]
replay_counter = data[33:37]
encrypted_data = data[37:]
e_ecdh_server = PINServerECDHv2(replay_counter, cke)
encrypted_key = e_ecdh_server.call_with_payload(
cke,
encrypted_data,
pin_func)
# Expecting to return an encrypted aes-key with hmac appended
assert len(encrypted_key) == AES_KEY_LEN_256 + (2*AES_BLOCK_LEN) + HMAC_SHA256_LEN
# Return response
return jsonify({'data': base64.b64encode(encrypted_key).decode()})
def _complete_server_call(pin_func):
try:
# Get request data
udata = json.loads(request.data)
if 'data' in udata:
return _complete_server_call_v2(pin_func, udata)
return _complete_server_call_v1(pin_func, udata)
except Exception as e:
app.logger.error("Error: {} {}".format(type(e), e))
app.logger.error("Request body: {}".format(request.data))
raise e
@app.route('/get_pin', methods=['POST'])
def get_pin_route():
return _complete_server_call(PINDb.get_aes_key)
@app.route('/set_pin', methods=['POST'])
def set_pin_route():
return _complete_server_call(PINDb.set_pin)
return app
app = flask_server()