-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathDTP_API.py
370 lines (306 loc) · 14.3 KB
/
DTP_API.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
# -*- coding: utf-8 -*-`
# Copyright (c) Centre Inria d'Université Côte d'Azur, University of Cambridge 2023.
# Authors: Kacper Pluta <[email protected]>, Alwyn Mathew <[email protected]>
# This file cannot be used without a written permission from the author(s).
"""
The file is a collection of methods used to interact with the DTP.
For more information, contact the author(s) listed above.
"""
import argparse
import logging
import requests
import validators
from file_read_backwards import FileReadBackwards
from tqdm import tqdm
try:
from DTP_config import DTPConfig
except ModuleNotFoundError:
import sys
sys.path.append('DTP_API')
from DTP_config import DTPConfig
from dtp_apis.count_DTP_API import CountAPI
from dtp_apis.create_DTP_API import CreateAPI
from dtp_apis.fetch_DTP_API import FetchAPI
from dtp_apis.link_DTP_API import LinkAPI
from dtp_apis.revert_DTP_API import RevertAPI
from dtp_apis.send_DTP_API import SendAPI
from dtp_apis.update_DTP_API import UpdateAPI
from helpers import logger_global, get_info_from_log
class DTPApi(FetchAPI, CountAPI, CreateAPI, LinkAPI, RevertAPI, SendAPI, UpdateAPI):
"""
Base API class for mixin classes.
Attributes
----------
simulation_mode : bool
if True then no changes to the database are performed.
DTP_CONFIG : class
an instance of DTP_Config
Methods
-------
init_logger(session_file)
None
init_external_logger(session_logger)
None
TODO: move to a new class all the methods, which are used for sending requests
post_general_request(payload, url, headers)
returns dictionary created from JSON
general_guarded_request(req_type, payload, url, headers)
returns dictionary created from JSON
post_guarded_request(payload, url, headers)
returns dictionary created from JSON
put_guarded_request(payload, url, headers)
returns dictionary created from JSON
pretty_http_request_to_string(req)
returns request string
"""
def __init__(self, dtp_config, simulation_mode=False):
"""
Parameters
----------
dtp_config : DTP_Config, obligatory
an instance of DTP_Config
simulation_mode : bool, optional
if set to True then method changing
the database are not send.
"""
self.simulation_mode = simulation_mode
self.DTP_CONFIG = dtp_config
self.session_logger = None
self.log_markers_node_classes = {
'new_element': 'NEW_ELEMENT_IRI',
'new_defect': 'NEW_DEFECT_IRI',
'new_action': 'NEW_ACTION_IRI',
'new_operation': 'NEW_OPERATION_IRI',
'new_constr': 'NEW_CONSTRUCTION_IRI',
'new_kpi': 'NEW_KPI_IRI'}
other_log_markers = {'link_elem_blob': 'NEW_LINK_ELEMENT_BLOB',
'new_blob': 'NEW_BLOB',
'update_asdesigned_param': 'UPDATE_isAsDesigned_PARAM_NODE_OPERATION',
'update_operation': 'UPDATE_OPERATION_IRI',
'update_construction': 'UPDATE_CONSTRUCTION_IRI',
'remove_param': 'REMOVED_PARAM_NODE_OPERATION',
'add_param': 'ADD_PARAM_NODE_OPERATION'}
try:
self.log_markers = self.log_markers_node_classes | other_log_markers
except TypeError: # dictionary merge operator only in python 3.9+
self.log_markers = {**self.log_markers_node_classes, **other_log_markers}
def init_logger(self, session_file):
"""
Method used for initializing a logger used to collect information about session: only node linking
and creation is saved at the moment. The method should be used only for single core processing.
For parallel processing use init_external_logger.
Parameters
----------
session_file: str obligatory
the path to the log file, it does not need to exist.
"""
if len(session_file.strip()) != 0:
print(f"Session log file at {session_file}")
formatter = logging.Formatter('%(asctime)s : %(message)s', datefmt='%d-%b-%y %H:%M:%S')
handler = logging.FileHandler(session_file)
handler.setFormatter(formatter)
self.session_logger = logging.getLogger('session_DTP')
self.session_logger.setLevel(logging.INFO)
self.session_logger.addHandler(handler)
def init_external_logger(self, session_logger):
"""
The method allows for passing an external session logger to the instance of the class.
Parameters
----------
session_logger: Logger (see logging) obligatory
an instance of the logger class.
"""
self.session_logger = session_logger
def post_general_request(self, payload, url=' ', headers=None):
"""
The method allows for sending POST requests to the DTP. This version does not respect the simulation mode.
For a simulation mode respecting version see: __post_guarded_request
Parameters
----------
payload: dict obligatory
the query to be sent to the platform.
url: str optional
the URL used for the HTTPS request
headers: dict optional
the header of the request, if not provided the default one is used.
"""
if headers is None:
headers = {
'Content-Type': 'application/json',
'Accept': 'application/json',
'Authorization': 'Bearer ' + self.DTP_CONFIG.get_token()
}
session = requests.Session()
if not validators.url(url):
raise Exception("Sorry, the URL is not a valid URL: " + url)
req = requests.Request("POST", url, headers=headers, data=payload)
prepared = req.prepare()
logger_global.info('HTTP request: \n' + self.pretty_http_request_to_string(prepared))
response = session.send(prepared)
logger_global.info('Response code: ' + str(response.status_code))
if response.ok:
return response
else:
logger_global.error(
"The response from the DTP is an error. Check the dev token and/or the domain. Status code: " + str(
response.status_code))
raise Exception(
"The response from the DTP is an error. Check the dev token and/or the domain. Status code: " + str(
response.status_code))
def general_guarded_request(self, req_type, payload, url=' ', headers=None):
"""
The method allows for sending POST requests to the DTP. This version does respect the simulation mode.
For a none simulation mode respecting version see: __post_general_request
Parameters
----------
payload: dict obligatory
the query to be sent to the platform.
url: str optional
the URL used for the HTTPS request
headers: dict optional
the header of the request, if not provided the default one is used.
"""
if headers is None:
headers = {
'Content-Type': 'application/json',
'Accept': 'application/json',
'Authorization': 'Bearer ' + self.DTP_CONFIG.get_token()
}
req_type_fix = req_type.strip().upper()
if len(req_type_fix) == 0:
Exception("Request type cannot be empty!")
if req_type_fix != 'PUT' or req_type_fix != 'POST':
Exception("Request type has to be: PUT or POST!")
session = requests.Session()
req = requests.Request(req_type_fix, url, headers=headers, data=payload)
prepared = req.prepare()
logger_global.info('HTTP request: \n' + self.pretty_http_request_to_string(prepared))
if not self.simulation_mode:
response = session.send(prepared)
logger_global.info('Response code: ' + str(response.status_code))
return response
return None
def post_guarded_request(self, payload, url=' ', headers=None):
return self.general_guarded_request('POST', payload, url, headers)
def put_guarded_request(self, payload, url=' ', headers=None):
return self.general_guarded_request('PUT', payload, url, headers)
def pretty_http_request_to_string(self, req):
"""
The method provides a printing method for pre-prepared HTTP requests.
Source: https://stackoverflow.com/questions/20658572/python-requests-print-entire-http-request-raw
Author: AntonioHerraizS
Usage
-----
req = requests.Request("POST", DTP_CONFIG.get_api_uri('send_blob'), headers=headers,
data=payload, files=files) # any request prepared = req.prepare() __pretty_http_request_to_string(prepared)
Parameters
----------
req : Request, obligatory
the pre-prepared request
"""
request_str = '{}\n{}\r\n{}\r\n\r\n{}\n{}'.format(
'-----------START-----------',
req.method + ' ' + req.url,
'\r\n'.join('{}: {}'.format(k, v) for k, v in req.headers.items()),
req.body,
'-----------END-----------'
)
return request_str
def revert_last_session(self, session_file):
"""
The method can revert the last non-empty sessions.
Parameters
----------
session_file : str, obligatory
path to the sessions file
"""
counter = 0
with FileReadBackwards(session_file, encoding="utf-8") as frb:
for line in tqdm(frb):
# that will be the last date once the beginning of the file is reached.
msg_date = line[0: line.find(' : ')]
if self.log_markers['link_elem_blob'] in line:
element_uuid, blob_uuid = get_info_from_log(line, self.log_markers['link_elem_blob'])
counter += 1
self.unlink_node_from_blob(element_uuid, blob_uuid)
elif self.log_markers['new_blob'] in line:
blob_uuid = get_info_from_log(line, self.log_markers['new_blob'])[0]
self.delete_blob_from_platform(blob_uuid)
counter += 1
elif self.log_markers['update_asdesigned_param'] in line:
element_iri = get_info_from_log(line, self.log_markers['update_asdesigned_param'])[0]
self.delete_asdesigned_param_node(element_iri)
counter += 1
elif self.log_markers['update_operation'] in line:
node_iri, dump_path = get_info_from_log(line, self.log_markers['update_operation'])
self.revert_node_update(node_iri, dump_path)
counter += 1
elif self.log_markers['update_construction'] in line:
node_iri, dump_path = get_info_from_log(line, self.log_markers['update_construction'])
self.revert_node_update(node_iri, dump_path)
counter += 1
elif self.log_markers['remove_param'] in line:
node_iri, field, field_value = get_info_from_log(line, self.log_markers['remove_param'])
self.add_param_in_node(node_iri, field, field_value)
counter += 1
elif self.log_markers['add_param'] in line:
node_iri, field = get_info_from_log(line, self.log_markers['remove_param'])
self.delete_param_in_node(node_iri, field, is_revert_session=True)
counter += 1
else:
try:
node_class = next(
substring for substring in self.log_markers_node_classes.values() if substring in line)
except StopIteration as E:
continue
index = line.find(node_class)
node_iri = line[index + len(node_class) + 1:].strip()
try:
node_uuid = self.get_uuid_for_iri(node_iri)
except Exception as e:
if hasattr(e, 'message'):
e_msg = e.message
else:
e_msg = e
logger_global.error(
'Error at the session revert for entry at : ' + msg_date + ', the message: ' + str(
e_msg) + '.')
continue
self.delete_node_from_graph(node_uuid)
counter = counter + 1
logger_global.info('The session started at: ' + msg_date + ', has been reverted.')
def query_all_pages(self, fetch_function, *fetch_function_arg):
"""
The method will query all pages for a query
Args:
fetch_function: function used to query DTP
fetch_function_arg: arguments to fetch_function
Returns:
"""
query_response_all_pages = fetch_function(*fetch_function_arg)
elements = query_response_all_pages
while 'next' in elements.keys() and elements['size'] != 0:
if len(fetch_function_arg) > 1:
elements = fetch_function(*fetch_function_arg, url=elements['next'])
else:
elements = fetch_function(url=elements['next'])
if elements['size'] <= 0:
break
query_response_all_pages['items'] += elements['items']
query_response_all_pages['size'] += elements['size']
return query_response_all_pages
# Below code snippet for testing only
def parse_args():
"""
Get parameters from user
"""
parser = argparse.ArgumentParser(description='Prepare DTP graph')
parser.add_argument('--xml_path', '-x', type=str, help='path to config xml file', required=True)
parser.add_argument('--simulation', '-s', default=False, action='store_true')
return parser.parse_args()
if __name__ == "__main__":
args = parse_args()
dtp_config = DTPConfig(args.xml_path)
dtp_api = DTPApi(dtp_config, simulation_mode=args.simulation)
response = dtp_api.activity_count_connected_task_nodes("http://bim2twin.eu/mislata_wp3/activity91217940_2")
print('Response:\n', response)