forked from alexplesoiu/dns-updater
-
Notifications
You must be signed in to change notification settings - Fork 0
/
main.py
302 lines (236 loc) · 8.45 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
"""
DNS Updater
Author: Alexandru-Ioan Plesoiu
GitHub: https://github.com/alexplesoiu
Documentation: https://github.com/alexplesoiu/dns-updater
DNS Updater is a Python-based tool that automatically updates Cloudflare DNS records
with your public IP address. If your server's IP address changes frequently or you
have a dynamic IP, this tool ensures that your domains and subdomains always point
to the correct server. It can handle multiple domains and subdomains from multiple
zones, with proxying enabled or disabled. The tool runs checks and updates every
5 minutes and includes redundancy for IP checking services.
"""
import time
import socket
import logging
import sys
import os
import json
import requests
import schedule
# Replace with your actual data
CF_API_TOKEN = os.getenv("CF_API_TOKEN")
CF_ZONE_ID = os.getenv("CF_ZONE_ID")
DNS_RECORD_COMMENT_KEY = os.getenv("DNS_RECORD_COMMENT_KEY")
DOMAINS_FILE_PATH = os.getenv("DOMAINS_FILE_PATH")
SCHEDULE_MINUTES = int(os.getenv("SCHEDULE_MINUTES", "60"))
# Define API endpoints
BASE_URL = "https://api.cloudflare.com/client/v4/"
# List of IP checking services
IP_CHECK_SERVICES = [
"https://adresameaip.ro/ip",
"https://api.ipify.org",
"https://icanhazip.com",
"https://ipinfo.io/ip",
]
def create_logger(level=logging.INFO):
"""Create the logger object"""
logger = logging.getLogger("MGE-Logs")
# Create handlers
console_handler = logging.StreamHandler(sys.stdout)
file_handler = logging.FileHandler("dns_updater.log")
console_handler.setLevel(level)
file_handler.setLevel(logging.WARNING)
# Create formatters and add it to handlers
logger_format = logging.Formatter(
"%(asctime)s | %(filename)s | %(levelname)s | %(message)s"
)
file_format = logging.Formatter(
"%(asctime)s | %(filename)s(%(lineno)d) | %(levelname)s | %(message)s"
)
file_handler.setFormatter(file_format)
console_handler.setFormatter(logger_format)
logger.addHandler(file_handler)
logger.addHandler(console_handler)
logger.setLevel(level)
return logger
LOGGER = create_logger()
def get_dns_record(zone_id, domain_name):
"""Get current DNS record for the specified domain"""
LOGGER.info("Fetching record for '%s' in zone '%s'.", domain_name, zone_id)
headers = {
"Authorization": "Bearer " + CF_API_TOKEN,
"Content-Type": "application/json",
}
params = {
"name": domain_name,
}
response = requests.get(
f"{BASE_URL}zones/{zone_id}/dns_records",
headers=headers,
params=params,
timeout=60,
)
if response.status_code == 200:
records = response.json()["result"]
if records:
LOGGER.info("Successfully fetched data for '%s'.", domain_name)
return records[0]
else:
LOGGER.error(
"Failed to fetch data for '%s'. Response: %s", domain_name, response.json()
)
return None
def update_dns_record(record, content):
"""Update the DNS record"""
headers = {
"Authorization": "Bearer " + CF_API_TOKEN,
"Content-Type": "application/json",
}
data = {"content": content}
response = requests.patch(
f"{BASE_URL}zones/{record['zone_id']}/dns_records/{record['id']}",
json=data,
headers=headers,
timeout=30,
)
if response.status_code == 200:
LOGGER.info(
"DNS record updated successfully: %s (%s) -> %s",
record["name"],
record["type"],
content,
)
else:
LOGGER.error("Failed to update DNS record: %s", response.json())
def read_zones_from_file(json_file_path, zone_id):
"""Loads static wishlist of domains in json format along with their metadata"""
with open(json_file_path, "r", encoding="utf-8") as file:
data = json.load(file)
zones = data["zones"]
for zone in zones:
if "$" in zone["id"]:
zone["id"] = zone_id
for domain in zone["domains"]:
domain["zone_id"] = zone["id"]
LOGGER.info("Sucessfully read zone %s.", zone)
return zones
def get_dns_records_by_name(zones):
"""Fetches all DNS records that were loaded from file"""
records = []
LOGGER.info("Trying to fetch records for %s zones.", len(zones))
for zone in zones:
for domain in zone["domains"]:
record = get_dns_record(domain["zone_id"], domain["name"])
if record is not None:
records.append(record)
return records
def get_dns_records_by_comment(zone_id, comment_key):
"""Fetches all DNS records that contain the comment key inside of the comment"""
headers = {
"Authorization": "Bearer " + CF_API_TOKEN,
"Content-Type": "application/json",
}
params = {
"comment.contains": comment_key,
}
LOGGER.info("Fetching DNS record with comment key: %s", comment_key)
response = requests.get(
f"{BASE_URL}zones/{zone_id}/dns_records",
headers=headers,
params=params,
timeout=60,
)
if response.status_code == 200:
records = response.json()["result"]
if records and len(records) > 0:
return records
LOGGER.warning(
"Request was successful but no valid domains were found: %s",
response.json(),
)
return []
LOGGER.error("Failed to get dns_records with comment key: %s", response.json())
return []
def get_public_ip():
"""Get public IP address from the list of IP checking services"""
for service in IP_CHECK_SERVICES:
try:
response = requests.get(service, timeout=5)
if response.status_code == 200:
return response.text.strip()
except requests.exceptions.RequestException:
continue
return None
def is_connected():
"""Check if there is an active internet connection"""
try:
host = socket.gethostbyname("www.cloudflare.com")
socket.create_connection((host, 80), 2)
return True
except socket.error as exc:
LOGGER.error("Socket error: %s", exc)
return False
def check_and_update_dns():
"""Function to run the check and update process"""
LOGGER.info("Run triggered by schedule.")
if not is_connected():
LOGGER.error("No internet connection. Skipping check and update.")
return
if CF_ZONE_ID is None:
LOGGER.error("CF_ZONE_ID: At least one zone id must be set.")
return
if CF_API_TOKEN is None:
LOGGER.error(
"CF_API_TOKEN Missing: You have to provide your Cloudflare API Token."
)
return
if DNS_RECORD_COMMENT_KEY is None and DOMAINS_FILE_PATH is None:
LOGGER.error(
"DNS_RECORD_COMMENT_KEY and DOMAINS_FILE_PATH are missing,"
+ " don't know which domains to update"
)
return
public_ip = get_public_ip()
domain_records = []
if DNS_RECORD_COMMENT_KEY is not None:
LOGGER.info(
"Using DNS_RECORD_COMMENT_KEY='%s' to find DNS records to update.",
DNS_RECORD_COMMENT_KEY,
)
domain_records = get_dns_records_by_comment(CF_ZONE_ID, DNS_RECORD_COMMENT_KEY)
else:
LOGGER.info(
"Using DOMAINS_FILE_PATH='%s' to find DNS records to update.",
DOMAINS_FILE_PATH,
)
domain_records = get_dns_records_by_name(
read_zones_from_file(DOMAINS_FILE_PATH, CF_ZONE_ID)
)
valid_domains = [x["name"] for x in domain_records if x is not None]
LOGGER.info(
"Found %s valid domains for update: [%s]",
len(valid_domains),
", ".join(valid_domains),
)
if public_ip:
for record in domain_records:
domain_name = record["name"]
if record is None:
LOGGER.error("DNS record for %s not found.", domain_name)
continue
if public_ip != record["content"]:
update_dns_record(record, public_ip)
else:
LOGGER.info(
"IP addresses are the same for %s. No update needed.", domain_name
)
else:
LOGGER.error("Failed to retrieve public IP. Skipping check and update.")
LOGGER.info("Schedule is set at %s minutes", SCHEDULE_MINUTES)
# Schedule the check and update process to run every X minutes
schedule.every(SCHEDULE_MINUTES).minutes.do(check_and_update_dns).run()
# Main loop
while True:
schedule.run_pending()
time.sleep(1)