-
Notifications
You must be signed in to change notification settings - Fork 1
/
db_utils.py
111 lines (86 loc) · 3.25 KB
/
db_utils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
import logging
import sys
from mysql.connector import errorcode
import mysql.connector
class DatabaseInconsistentError(Exception):
pass
class InvalidFilenameError(Exception):
pass
class DataInvariantException(Exception):
pass
class DbUtils:
def __init__(self, database_user, database_password, database_port, database_host, database_name):
self.database_user = database_user
self.database_password = database_password
self.database_port = database_port
self.database_host = database_host
self.database_name = database_name
self.logger = logging.getLogger('Client.dbutils')
self.cnx = None
self.connect()
def reset_connection(self):
self.logger.info(f"Resetting connection to {self.database_host}")
if self.cnx:
try:
self.cnx.close()
except Exception:
pass
self.connect()
def connect(self):
if self.cnx is None:
self.logger.debug(f"Connecting to db {self.database_host}...")
try:
self.cnx = mysql.connector.connect(user=self.database_user,
password=self.database_password,
host=self.database_host,
port=self.database_port,
database=self.database_name)
except mysql.connector.Error as err:
if err.errno == errorcode.ER_ACCESS_DENIED_ERROR:
self.logger.error(f"Starting client...")
self.logger.error("SQL: Access denied")
elif err.errno == errorcode.ER_BAD_DB_ERROR:
self.logger.error("Database does not exist")
else:
self.logger.error(err)
sys.exit(1)
except Exception as err:
self.logger.error(f"Unknown exception: {err}")
sys.exit(1)
self.logger.info("Db connected")
else:
pass
# self.logger.debug(f"Already connected db {self.database_host}...")
def get_one_record(self, sql):
self.connect()
cursor = self.cnx.cursor()
try:
cursor.execute(sql)
retval = cursor.fetchone()
except Exception as e:
print(f"Exception thrown while processing sql: {sql}\n{e}\n", file=sys.stderr, flush=True)
retval = None
if retval is None:
self.logger.warning(f"Warning: No results from: \n\n{sql}\n")
else:
retval = retval[0]
cursor.close()
return retval
def get_records(self, query):
cursor = self.get_cursor()
cursor.execute(query)
record_list = list(cursor.fetchall())
self.logger.debug(f"get records SQL: {query}")
cursor.close()
return record_list
def get_cursor(self):
self.connect()
return self.cnx.cursor()
def execute(self,sql):
cursor = self.get_cursor()
self.logger.debug(f"SQL: {sql}")
cursor.execute(sql)
self.cnx.commit()
cursor.close()
def commit(self):
self.cnx.commit()