forked from awslabs/open-data-registry
-
Notifications
You must be signed in to change notification settings - Fork 2
/
ext.py
124 lines (88 loc) · 3.24 KB
/
ext.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
import re
import yaml
import requests
import time
from urllib3.exceptions import InsecureRequestWarning
# Suppress the warning on Verify=False requests
requests.packages.urllib3.disable_warnings(category=InsecureRequestWarning)
tags = yaml.safe_load(open("tags.yaml"))
tags.append("aws-pds")
adx_categories = yaml.safe_load(open("adx_categories.yaml"))
resources = yaml.safe_load(open("resources.yaml"))
services = yaml.safe_load(open("services.yaml"))
arn_regex = re.compile(r"^arn:(aws|aws-iso):.+:.*:.*:.+$")
host_regex = re.compile(r"^(https?:\/\/)?([\da-z\.-]+)\.([a-z\.]{2,63})(\/.*)*\/?$")
controlled_access_regex = re.compile(
r"^(https?:\/\/)?([\da-z\.\-\_]+)\.([a-z\.]{2,63})(\/.*)*\/?$"
)
explore_regex = re.compile(r"^\[.+\]\(https?:\/\/[\w\d.\-\/#\?\&\%=]+\)$")
def retry(howmany):
def tryFunc(func):
def f(*args, **kwargs):
attempts = 0
while attempts < howmany:
try:
return func(*args, **kwargs)
except:
attempts += 1
time.sleep(1.0 * 4.0 ** attempts)
if attempts >= howmany:
raise
return f
return tryFunc
# Check if provided tags are in tags.yaml
def ext_tags(value, rule_obj, path):
if value not in tags:
print("Invalid tag!", value)
return False
# If we're here, all tags were ok
return True
# Check if provided ADX categories are in adx_categories.yaml
def ext_adx_categories(value, rule_obj, path):
if value not in adx_categories:
print("Invalid ADX category!", value)
return False
# If we're here, all categories were ok
return True
# Check if provided resources are in resources.yaml
def ext_resources(value, rule_obj, path):
if value not in resources:
print("Invalid resource!", value)
return False
# If we're here, all resources were ok
return True
# Check if provided services are in services.yaml
def ext_services(value, rule_obj, path):
if value not in services:
print("Invalid service!", value)
return False
# If we're here, all services were ok
return True
# Check to make sure we have a valid arn
def ext_resources_arn(value, rule_obj, path):
if not re.fullmatch(arn_regex, value):
print(
"ARN '{}' is not valid, it should like like arn:aws:s3:::yourbucket".format(
value
)
)
return False
return True
# Check to make sure we have a valid host
def ext_resources_host(value, rule_obj, path):
if not re.fullmatch(host_regex, value):
print("Host '{}' is not valid".format(value))
return False
return True
# Check to make sure we have a valid controlled access string
def ext_resources_controlled_access(value, rule_obj, path):
if not re.fullmatch(controlled_access_regex, value):
print("Controlled Access string '{}' is not valid".format(value))
return False
return True
# Check to make sure we have a valid array of links
def ext_resources_explore(value, rule_obj, path):
if not re.fullmatch(explore_regex, value):
print("Explore string '{}' is not a valid link".format(value))
return False
return True