forked from pvxe/nftables-geoip
-
Notifications
You must be signed in to change notification settings - Fork 0
/
nft_geoip.py
executable file
·325 lines (272 loc) · 11.7 KB
/
nft_geoip.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
#!/usr/bin/env python3
#
# (C) 2019 by Jose M. Guisado <[email protected]>
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
"""
Generate files containing nftables geoip mappings and definitions.
This script is intended to be executed and not imported.
"""
from collections import namedtuple
from datetime import datetime
import argparse
import csv
import gzip
import ipaddress
import os
import requests
import shutil
import sys
import time
import unicodedata
DEFAULT_FILE_LOCATION = 'location.csv'
DEFAULT_FILE_ADDRESS = 'dbip.csv'
# entries in location csv
GeoEntry = namedtuple('GeoEntry',
'name, '
'alpha_2, '
'alpha_3, '
'country_code, '
'iso_3166_2, '
'region, '
'sub_region, '
'intermediate_region, '
'region_code, '
'sub_region_code, '
'intermediate_region_code')
# entries in DB-IP geoip csv
NetworkEntry = namedtuple('NetworkEntry',
'network_first, '
'network_last, '
'country_alpha_2')
def strip_accent(text):
"""
Remove accented characters. Convert to ASCII.
"""
return ''.join(char for char in unicodedata.normalize('NFKD', text)
if unicodedata.category(char) != 'Mn')
def make_location_dicts():
"""
Returns three formatted dictionaries with following key/value:
- iso_3166_2 (numeric) : country name
- country_name : continent/region name
- country_name : alpha2 name
"""
country_dict = {}
continent_dict = {}
country_alpha_dict = {}
next(args.locations) # Omit license notice
next(args.locations) # Omit csv header
for geo_entry in map(GeoEntry._make, csv.reader(args.locations)):
country_name = normalize(geo_entry.name)
country_dict[normalize(geo_entry.country_code.lstrip('0'))] = country_name
continent_dict[country_name] = normalize(geo_entry.region)
country_alpha_dict[country_name] = normalize(geo_entry.alpha_2)
return country_dict, continent_dict, country_alpha_dict
def normalize(value):
"""
Strip accents and replace special characters.
Used for keys and values inside location dictionaries.
"""
return strip_accent(value).lower().replace(' ', '_').replace('[', '').replace(']', '').replace(',', '')
def write_geoip_location(country_dict, continent_dict, country_alpha_dict):
"""
Write country iso code definitions, separating files for each continent
(eg. geoip-def-asia.nft, etc.)
Also writes a definition file with all countries in "geoip-def-all.nft"
"""
for continent in continent_dict.values():
with open(args.dir+'geoip-def-{}.nft'.format(continent), 'w') as output_file:
try:
for country, iso in [(country, iso)
for iso, country
in country_dict.items()
if continent_dict[country] == continent]:
output_file.write('define {} = {}\n'.format(country_alpha_dict[country].upper(), iso))
except KeyError as e:
# 'ZZ' is used for an unknown country, so it won't match
# any country in the location file. Pass and do not write
# to output file
pass
with open(args.dir+'geoip-def-all.nft', 'w') as output_file:
for iso, country in country_dict.items():
output_file.write('define {} = {}\n'.format(country_alpha_dict[country].upper(), iso))
output_file.write('\n' * 2)
output_file.write('define africa = 1\n'
'define asia = 2\n'
'define europe = 3\n'
'define americas = 4\n'
'define oceania = 5\n'
'define antarctica = 6\n')
output_file.write('\n')
output_file.write('map continent_code {\n'
'\ttype mark : mark\n'
'\tflags interval\n'
'\telements = {\n\t\t')
output_file.write(',\n\t\t'.join(make_lines2({country_alpha_dict[country].upper():v
for country, v
in continent_dict.items()})))
output_file.write('\n')
output_file.write('\t}\n')
output_file.write('}\n')
def check_ipv4(addr):
"""
Returns true if a string is representing an ipv4 address.
False otherwise.
"""
try:
ipaddress.IPv4Address(addr)
return True
except ipaddress.AddressValueError:
return False
def make_geoip_dict(country_alpha_dict):
"""
Read DB-IP network ranges and creates geoip4 and geoip6 dictionaries
mapping ip ranges over country alpha-2 codes
"""
# XXX: country_alpha_dict is used to prune countries not appearing
# inside location.csv (eg. Kosovo)
#
# Kosovo could be added to location.csv with a "virtual" code
# Kosovo,XK,XKX,999,ISO 3166-2:XK,Europe,"","","","",""
#
# Or instead use geonames.
geoip4_dict = {}
geoip6_dict = {}
known_alphas = country_alpha_dict.values()
for net_entry in map(NetworkEntry._make, csv.reader(args.blocks)):
alpha2 = net_entry.country_alpha_2.lower()
# 'ZZ' or codes not appearing in location.csv will be ignored
if (alpha2 == 'zz') or (alpha2 not in known_alphas):
continue
# There are entries in DB-IP csv for single addresses which
# are represented as same start and end address range.
# nftables does not accept same start/end address in ranges
if net_entry.network_first == net_entry.network_last:
k = net_entry.network_first
else:
k = '-'.join((net_entry.network_first, net_entry.network_last))
if check_ipv4(net_entry.network_first):
geoip4_dict[k] = alpha2
else:
geoip6_dict[k] = alpha2
return geoip4_dict, geoip6_dict
def make_lines1(dictionary):
"""
For each entry in the dictionary maps to a line for nftables dict files
using key literal and value as nft variable.
"""
return ['{} : ${}'.format(k, v) for k, v in dictionary.items()]
def make_lines2(dictionary):
"""
For each entry in the dictionary maps to a line for nftables dict files
using key and value as nft variables.
"""
return ['${} : ${}'.format(k, v) for k, v in dictionary.items()]
def write_nft_header(f):
"""
Writes nft comments about generation date and db-ip copyright notice.
"""
f.write("# Generated by nft_geoip.py on {}\n"
.format(datetime.now().strftime("%a %b %d %H:%M %Y")))
f.write("# IP Geolocation by DB-IP (https://db-ip.com) licensed under CC-BY-SA 4.0\n\n")
def write_geoip_maps(geoip4_dict, geoip6_dict):
"""
Write ipv4 and ipv6 geoip nftables maps to corresponding output files.
"""
with open(args.dir+'geoip-ipv4.nft', 'w') as output_file:
write_nft_header(output_file)
output_file.write('map geoip4 {\n'
'\ttype ipv4_addr : mark\n'
'\tflags interval\n'
'\telements = {\n\t\t')
output_file.write(',\n\t\t'.join(make_lines1({k:v.upper() for k,v in geoip4_dict.items()})))
output_file.write('\n')
output_file.write('\t}\n')
output_file.write('}\n')
with open(args.dir+'geoip-ipv6.nft', 'w') as output_file:
write_nft_header(output_file)
output_file.write('map geoip6 {\n'
'\ttype ipv6_addr : mark\n'
'\tflags interval\n'
'\telements = {\n\t\t')
output_file.write(',\n\t\t'.join(make_lines1({k:v.upper() for k,v in geoip6_dict.items()})))
output_file.write('\n')
output_file.write('\t}\n')
output_file.write('}\n')
def create_parser():
"""
Returns cli argument parser
"""
parser = argparse.ArgumentParser(description=__doc__,
prog='nft_geoip')
parser.add_argument('--file-location',
type=argparse.FileType('r'),
help='path to csv file containing information about countries. '
f'(default: {DEFAULT_FILE_LOCATION})',
default=DEFAULT_FILE_LOCATION,
required=False,
dest='locations')
parser.add_argument('--file-address',
type=argparse.FileType('r'),
help='path to db-ip.com lite cvs file with ipv4 and ipv6 geoip information '
f'(default: {DEFAULT_FILE_ADDRESS})',
required=False,
dest='blocks')
parser.add_argument('-d', '--download', action='store_true',
help='fetch geoip data from db-ip.com. This option overrides --file-address',
required=False,
dest='download')
parser.add_argument('-o', '--output-dir',
help='Existing directory where downloads and output will be saved. '
'(default: working directory)',
required=False,
dest='dir')
return parser
if __name__ == '__main__':
parser = create_parser()
args = parser.parse_args()
if not args.dir:
args.dir = ''
elif not os.path.isdir(args.dir):
parser.print_help()
sys.exit('\nSpecified output directory does not exist or is not a directory')
else:
# Add trailing / for folder path if there isn't
args.dir += '/' if args.dir[-1:] != '/' else ''
if args.download:
filename = args.dir+'dbip.csv.gz'
url = 'https://download.db-ip.com/free/dbip-country-lite-{}.csv.gz'.format(time.strftime("%Y-%m"))
print('Downloading db-ip.com geoip csv file...')
r = requests.get(url, stream=True)
if r.status_code == 200:
with open(filename, 'wb') as f:
r.raw.decode_content = True
shutil.copyfileobj(r.raw, f)
else:
sys.exit('Error trying to download DB-IP lite geoip csv file. Bailing out...')
with gzip.open(filename, 'rb') as f_in:
with open(args.dir+'dbip.csv', 'wb') as f_out:
shutil.copyfileobj(f_in, f_out)
os.remove(filename)
# Update blocks arg with the downloaded file
args.blocks = open(args.dir+'dbip.csv', 'r', encoding='utf-8')
if not (args.blocks or args.locations):
parser.print_help()
sys.exit('Missing required address and location csv files.')
if not args.blocks:
parser.print_help()
sys.exit('Missing geoip address csv file. You can instead download it using --download.')
if not args.locations:
parser.print_help()
sys.exit('Missing country information csv file')
country_dict, continent_dict, country_alpha_dict = make_location_dicts()
print('Writing country definition files...')
write_geoip_location(country_dict, continent_dict, country_alpha_dict)
print('Writing nftables maps (geoip-ipv{4,6}.nft)...')
geoip4_dict, geoip6_dict = make_geoip_dict(country_alpha_dict)
write_geoip_maps(geoip4_dict, geoip6_dict)
print('Done!')