forked from asheiduk/dyndns53
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathdyndns53.py
234 lines (193 loc) · 6.76 KB
/
dyndns53.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
#!/usr/bin/env python3
# -*- encoding: utf-8 -*-
import logging
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
import json
from ipaddress import ip_address, IPv4Address, IPv6Address, AddressValueError
from base64 import b64decode
import boto3
class ClientError(Exception):
pass
class AuthorizationMissing(ClientError):
status = 401
response = {"WWW-Authenticate":"Basic realm=dyndns53"}
class HostnameException(ClientError):
status = 404
response = "nohost"
class AuthorizationException(ClientError):
status = 403
response = "badauth"
class BadAgentException(ClientError):
status = 400
response = "badagent"
conf = {
'<username>:<password>': {
'hosts': {
'<host.example.com.>': {
'zone_id': '<MY_ZONE_ID>',
'record': {
'ttl': 60,
'type': 'A',
},
},
},
},
}
def lambda_handler(event, context):
def json_error(e, status, response):
msg = json.dumps({'status': status, 'response': response, 'additional': str(e)})
return type(e)(msg)
try:
response = _handler(event, context)
except ClientError as e:
raise json_error(e, status = e.status, response = e.response) from e
except Exception as e:
raise json_error(e, status = 500, response = '911') from e
return { 'status': 200, 'response': response }
def _handler(event, context):
if 'header' not in event:
msg = "Headers not populated properly. Check API Gateway configuration."
raise KeyError(msg)
try:
auth_header = event['header']['Authorization']
except KeyError:
raise AuthorizationMissing("Authorization required but not provided.")
try:
auth_user, auth_pass = (
b64decode(auth_header[len('Basic '):]).decode('utf-8').split(':') )
except Exception:
msg = "Malformed basicauth string: {}"
raise BadAgentException(msg.format(auth_header))
auth_string = ':'.join([auth_user,auth_pass])
if auth_string not in conf:
raise AuthorizationException("Bad username/password.")
try:
host = event['querystring']['hostname']
except KeyError:
raise BadAgentException("Hostname required but not provided.")
if not host.endswith('.'):
host += '.'
if host not in conf[auth_string]['hosts']:
raise HostnameException()
host_conf = conf[auth_string]['hosts'][host]
try:
myip = event['querystring']['myip']
ipv4, ipv6 = parse_myip(myip)
logger.debug(f'User supplied IP address(es): IPs: {ipv4}, {ipv6}')
except KeyError:
# possible bug: there is no source-ip (perhaps due to mapping errors or due to unknown protocols?)
ip = ip_address(event['context']['source-ip'])
if ip.version == 4:
ipv4, ipv6 = ip, None
elif ip.version == 6:
ipv4, ipv6 = None, ip
logger.debug(f'User omitted IP address, using best-guess from $context: {ip}')
# prefer IPv4 if both are supplied
ip = ipv4 or ipv6
if r53_upsert(host, host_conf['zone_id'], host_conf['ttl'], ipv4, ipv6):
return f'good {ip}'
else:
return f'nochg {ip}'
def parse_myip(myip: str) -> (IPv4Address, IPv6Address):
ipv4, ipv6 = None, None
for ipstring in filter(None, myip.split(',')):
# TODO: ignore empty components?
# TODO: move try/except to the caller?
try:
ip = ip_address(ipstring)
except ValueError:
raise BadAgentException(f'Invalid IP string: {myip}')
if ip.version == 4:
if ipv4:
raise BadAgentException(f'More than one IPv4 address provided: {myip}')
ipv4 = ip
elif ip.version == 6:
if ipv6:
raise BadAgentException(f'More than one IPv6 address provided: {myip}')
ipv6 = ip
return ipv4, ipv6
client53 = boto3.client('route53','us-west-2')
def r53_upsert(host, zone_id, ttl, ipv4, ipv6):
(old_ipv4, old_ipv6) = get_ips(host, zone_id)
logger.debug(f'Old IPs: {old_ipv4}, {old_ipv6} -- New IPs: {ipv4}, {ipv6}')
ips = [
(old_ipv4, ipv4),
(old_ipv6, ipv6)
]
change_batch = create_change_batch(host, ttl, ips)
if change_batch['Changes']:
logger.debug('Performing %s change(s) in route53', len(change_batch['Changes']))
client53.change_resource_record_sets(
HostedZoneId = zone_id,
ChangeBatch = change_batch
)
return True
else:
logger.debug('No changes for route53')
return False
def get_ips(host: str, zone_id: str):
if not host.endswith('.'):
host += '.'
response = client53.list_resource_record_sets(
HostedZoneId = zone_id,
StartRecordName = host,
StartRecordType = 'A',
MaxItems = '2' # A and AAAA should be adjacent
)
record_sets = response['ResourceRecordSets']
if not record_sets:
return (None, None)
def get_value():
if len(record_set['ResourceRecords']) > 1:
raise ValueError(f'Multiple existing records found for host {host} in zone_id {zone_id}')
return record_set['ResourceRecords'][0]['Value']
ipv4, ipv6 = None, None
for record_set in record_sets:
if record_set['Name'] == host:
if record_set['Type'] == 'A':
ipv4 = IPv4Address(get_value())
if record_set['Type'] == 'AAAA':
ipv6 = IPv6Address(get_value())
# may still be empty!
return (ipv4, ipv6)
def create_change_batch(host: str, ttl: int, ips):
def get_type(ip):
if ip.version == 4: return 'A'
if ip.version == 6: return 'AAAA'
return None
def create_change(old_ip, new_ip):
if new_ip:
return {
'Action': 'UPSERT',
'ResourceRecordSet': {
'Name': host,
'Type': get_type(new_ip),
'TTL': ttl,
'ResourceRecords': [
{
'Value': str(new_ip)
}
]
}
}
elif old_ip:
return {
'Action': 'DELETE',
'ResourceRecordSet': {
'Name': host,
'Type': get_type(old_ip),
'TTL': ttl,
'ResourceRecords': [
{
'Value': str(old_ip)
}
]
}
}
return {
'Changes': [
create_change(old_ip, new_ip)
for old_ip, new_ip in ips if (old_ip or new_ip) and old_ip != new_ip
]
}