-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathstartel_clients_html_importer.py
100 lines (77 loc) · 2.98 KB
/
startel_clients_html_importer.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
import os
import sys
import re
import django
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'clients.settings')
django.setup()
from django.db.migrations.executor import MigrationExecutor
from django.db import connections, DEFAULT_DB_ALIAS
from agent.models import ClientPage
from bs4 import BeautifulSoup
import logging
logger = logging.getLogger(__name__)
def is_database_synchronized(database):
connection = connections[database]
connection.prepare_database()
executor = MigrationExecutor(connection)
targets = executor.loader.graph.leaf_nodes()
return not executor.migration_plan(targets)
class MigrationException(Exception):
pass
class StartelClientsHTMLImporter:
def __init__(self, report_path):
self.report_path = report_path
def start(self):
self._raise_if_not_synchronized()
updated_count = 0
created_count = 0
with open(self.report_path, 'r') as report:
for block in re.split('<BR class=brk>', report.read(), flags=re.IGNORECASE):
client_id = self._get_client_id(block)
name = self._get_client_name(block)
try:
client_id = int(client_id)
except ValueError:
logger.warning(f'Skipping non-integer {client_id}')
continue
client, created = ClientPage.objects.update_or_create(
client_id=client_id,
defaults={'data': block, 'name': name}
)
if created:
created_count += 1
logger.info(f'Adding {client_id} - {name}')
else:
updated_count += 1
logger.warning(f'Updating {client_id} - {name}')
logger.info(f'''
Import finished
Updated:\t{updated_count}
Created:\t{created_count}
Total:\t\t{updated_count+created_count}
''')
@staticmethod
def _get_client_id(block):
# first td in second tr of first tbody
soup = BeautifulSoup(block, 'html5lib')
client_id = soup.find('tbody').find_all('tr')[2].find('td').text
return client_id
@staticmethod
def _get_client_name(block):
# second td in second tr of first tbody
soup = BeautifulSoup(block, 'html5lib')
client_name = soup.find('tbody').find_all('tr')[2].find_all('td')[1].text
return client_name
@staticmethod
def _raise_if_not_synchronized():
if not is_database_synchronized(DEFAULT_DB_ALIAS):
raise MigrationException('Database not migrated. Run `manage.py migrate` before importing.')
if __name__ == '__main__':
try:
report_path = sys.argv[1]
except IndexError:
sys.exit(f'Please provide a report path: `{sys.argv[0]} path/to/the/report.htm`')
try:
StartelClientsHTMLImporter(report_path).start()
except MigrationException as e:
sys.exit(e)