-
Notifications
You must be signed in to change notification settings - Fork 18
/
Copy pathhelper.py
271 lines (243 loc) · 11.5 KB
/
helper.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
#!/usr/bin/env python3
# Copyright © 2020 Red Hat Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Author: Yusuf Zainee <[email protected]>
#
"""Helper functions for the graph sync operations."""
import requests
import logging
import os
import boto3
import json
import re
from unified_range import api
from f8a_version_comparator.comparable_version import ComparableVersion
from requests.adapters import HTTPAdapter
from requests.packages.urllib3.util.retry import Retry
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class Helper:
"""Helper class with some utility functions."""
def __init__(self):
"""Init method for helper class."""
self.CVE_BUCKET = os.environ.get("REPORT_BUCKET_NAME", '')
self.AWS_KEY = os.environ.get("AWS_S3_ACCESS_KEY_ID_REPORT_BUCKET", '')
self.AWS_SECRET = os.environ.get("AWS_S3_SECRET_ACCESS_KEY_REPORT_BUCKET", '')
self.AWS_REGION = os.environ.get("AWS_S3_REGION", "us-east-1")
self.HOST = os.environ.get('BAYESIAN_DATA_IMPORTER_SERVICE_HOST', 'bayesian-data-importer')
self.PORT = os.environ.get('BAYESIAN_DATA_IMPORTER_SERVICE_PORT', '9192')
self.s3_resource = boto3.resource('s3', aws_access_key_id=self.AWS_KEY,
aws_secret_access_key=self.AWS_SECRET,
region_name=self.AWS_REGION)
def get_session_retry(self, retries=3, backoff_factor=0.2,
status_forcelist=(404, 500, 502, 504),
session=None):
"""Set HTTP Adapter with retries to session."""
session = session or requests.Session()
retry = Retry(total=retries, read=retries, connect=retries,
backoff_factor=backoff_factor, status_forcelist=status_forcelist)
adapter = HTTPAdapter(max_retries=retry)
session.mount('http://', adapter)
return session
def execute_gremlin_dsl(self, payloads):
"""Execute the gremlin query and return the response."""
url = "http://{host}:{port}".format(
host=os.environ.get("BAYESIAN_GREMLIN_HTTPINGESTION_SERVICE_HOST", "localhost"),
port=os.environ.get("BAYESIAN_GREMLIN_HTTPINGESTION_SERVICE_PORT", "8181"))
try:
payload = json.dumps(payloads)
resp = self.get_session_retry().post(
url, headers={'Content-Length': str(len(payload))}, data=payload)
if resp.status_code == 200:
json_response = resp.json()
data = json_response.get("result", {}).get("data", [])
if len(data) > 0:
logger.info("Success")
else:
logger.info("Something went wrong. No updates happened.")
return None
return json_response
else:
logger.error("HTTP error {}. Error retrieving Gremlin data.".format(
resp.status_code))
return None
except Exception as e:
logger.error(e)
return None
def store_json_content(self, content, obj_key):
"""Store the report content to the S3 storage."""
try:
logger.info('Storing the data into the S3 file %s' % obj_key)
self.s3_resource.Object(self.CVE_BUCKET, obj_key).put(
Body=json.dumps(content, indent=2).encode('utf-8'))
except Exception as e:
logger.exception('%r' % e)
def _retrieve_dict(self, object_key):
"""Retrieve a dictionary stored as JSON from S3."""
return json.loads(self._retrieve_blob(object_key).decode('utf-8'))
def _retrieve_blob(self, object_key):
"""Retrieve remote object content."""
return self.s3_resource.Object(self.CVE_BUCKET, object_key).get()['Body'].read()
def read_data_from_s3(self, name, loc):
"""Read the snyk data from S3."""
try:
filename = loc + name + ".json"
logger.info('Retrieving the data from the S3 file %s' % filename)
return self._retrieve_dict(filename)
except Exception as e:
logger.error(e)
return False
def make_api_call(self, payload, mode):
"""Make an API call to data importer."""
try:
api_url = "http://" + self.HOST + ":" + self.PORT
headers = {'Content-type': 'application/json'}
msg = ""
response = ""
if mode == "PUT":
response = requests.put('{}/api/v1/snyk-cves'.format(api_url), headers=headers,
data=json.dumps(payload))
msg = "ingested"
key = payload['package']
elif mode == "DELETE":
response = requests.delete('{}/api/v1/snyk-cves'.format(api_url), headers=headers,
data=json.dumps(payload))
msg = "deleted"
key = payload['id']
if response.status_code == 200:
logger.info("CVEs for {p} successfully {m}".format(p=key, m=msg))
return "success"
elif response.status_code == 504:
logger.info("Operation for {p} is taking time. "
"Will be {m} in the background".format(p=key, m=msg))
return "delayed"
else:
logger.info("Error while {m} CVEs for {p}".format(p=key, m=msg))
logger.info("Status Code {}".format(response.status_code))
return "failed"
except Exception:
logger.error("API call to data importer failed.")
def is_dry_run(self):
"""Return True if this is a dry run."""
# Set this value to true if you want the entire operation to run, but not the ingestion.
return os.environ.get('SNYK_DRY_RUN', 'false').lower() in ('1', 'yes', 'true')
def force_run_ingestion(self):
"""Return if ingestion mode is on."""
# Set this value when you want to run the ingestion forcefully (ignores runtime).
return os.environ.get('SNYK_INGESTION_FORCE_RUN', 'false').lower() in ('1', 'yes', 'true')
def is_delta_mode_on(self):
"""Return if the delta feed mode is on."""
# Set this value if you want to run only in the diff mode.
return os.environ.get('SNYK_DELTA_FEED_MODE', 'false').lower() in ('1', 'yes', 'true')
def ingestion_run_time(self):
"""Return the time when ingestion needs to run."""
# Set the time at which you want the ingestion to run.
return os.environ.get('SNYK_INGESTION_RUN_TIME', '12')
def is_snyk_retry_disabled(self):
"""Disable the snyk retry mechanism."""
# Set this value to false if you want to disable the snyk retry mechanism.
return os.environ.get('DISABLE_SNYK_INGESTION_RETRY', 'false').lower() \
in ('1', 'yes', 'true')
def is_custom_mode_enabled(self):
"""Enable or disable the bootstrap mode."""
return os.environ.get('SNYK_CUSTOM_MODE', 'false').lower() in ('1', 'yes', 'true')
def is_complete_vuln_mode(self):
"""Enable or disable the bootstrap mode."""
return os.environ.get('COMPLETE_VULN_MODE', 'false').lower() in ('1', 'yes', 'true')
def is_vuln_mode_disabled(self):
"""Return if the vuln mode is on."""
# Set this value if you want the vuln data to be collected in the S3 file.
return os.environ.get('DISABLE_VULN_MODE', 'false').lower() in ('1', 'yes', 'true')
def get_version_rules(self, vuln_versions):
"""Version rules for all eco."""
rules = []
regex_op = "[0-9a-zA-Z\\_\\.\\-]+"
regex_vr = "[<>=*]+"
"""For all the vulnerable versions information that we get, we need to create
comparable version object so that we can apply these rules on top of all the available
versions of a pkg in the market."""
for version in vuln_versions:
version = version.replace(" ", "")
version = version.replace("+incompatible", "")
sub_vers = version.split('||')
for sub_ver in sub_vers:
tmp = []
vr_relations = re.split(regex_vr, sub_ver)
op_relations = re.split(regex_op, sub_ver)
# Single affected version.
if len(vr_relations) == 1:
tmp.append({
'key': "=",
'val': ComparableVersion(vr_relations[0])
})
# All versions affected.
elif len(op_relations) == 1 and op_relations[0] == '*':
tmp.append({
'key': "*",
'val': ""
})
else:
for i in range(len(op_relations) - 1):
tmp.append({
'key': op_relations[i],
'val': ComparableVersion(vr_relations[i + 1])
})
rules.append(tmp)
return rules
def _is_relation_applicable(self, key, version, rule):
"""Check if the version satisfies the relation."""
if key == '<':
return ComparableVersion(version) < rule
elif key == '>':
return ComparableVersion(version) > rule
elif key == '=':
return ComparableVersion(version) == rule
elif key == '<=':
return ComparableVersion(version) <= rule
elif key == '>=':
return ComparableVersion(version) >= rule
elif key == '*':
return True
return False
def get_affected_versions(self, rules, versions):
"""Get affected versions for maven, pypi, npm."""
affected = []
for ver in versions:
for rule in rules:
# If there is a singular rule Ex >=2.1.1
if len(rule) == 1:
if self._is_relation_applicable(rule[0]['key'], ver, rule[0]['val']):
affected.append(ver)
# If there are 2 rules Ex >=2.1.1 & <2.1.5
elif len(rule) == 2:
key0 = rule[0]['key']
key1 = rule[1]['key']
first = self._is_relation_applicable(key0, ver, rule[0]['val'])
second = self._is_relation_applicable(key1, ver, rule[1]['val'])
if first and second:
affected.append(ver)
else:
if '=' in key0:
if self._is_relation_applicable("=", ver, rule[0]['val']):
affected.append(ver)
elif '=' in key1:
if self._is_relation_applicable("=", ver, rule[1]['val']):
affected.append(ver)
return list(set(affected))
def get_semver_versions(self, versions):
"""Convert to semver version format."""
semver = []
for ver in versions:
semver.append(api.to_semver(ver))
return semver