forked from huchenxucs/ChatDB
-
Notifications
You must be signed in to change notification settings - Fork 0
/
mysql.py
129 lines (110 loc) · 4.07 KB
/
mysql.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
import pymysql
from prettytable import PrettyTable
def sql_result_to_table_str(sql_result):
# Create a PrettyTable object
table = PrettyTable()
table.field_names = sql_result[0].keys()
# Add rows to the table
for row in sql_result:
table.add_row(row.values())
table.float_format = ".2"
return str(table)
class MySQLDB(object):
def __init__(self, host, port, user, password, database=None):
self.host = host
self.port = port
self.user = user
self.password = password
self.database = database
self.conn = None
self.cursor = None
def connect(self):
self.conn = pymysql.connect(
host=self.host,
port=self.port,
user=self.user,
password=self.password,
database=self.database,
cursorclass=pymysql.cursors.DictCursor
)
self.cursor = self.conn.cursor()
def disconnect(self):
if self.cursor:
self.cursor.close()
if self.conn:
self.conn.close()
def execute_sql(self, sql, raise_err=False):
try:
self.connect()
for sub_sql in sql.split(";"):
sub_sql = sub_sql.strip()
if len(sub_sql) > 0:
self.cursor.execute(sub_sql)
result = self.cursor.fetchall()
self.conn.commit()
except Exception as e:
self.conn.rollback()
error_message = f"SQL error: {str(e)}"
if raise_err:
raise e
else:
return e, error_message
finally:
self.disconnect()
# print(result)
# convert query result to string
if result:
# rows = [', '.join(str(v) for v in row.values()) for row in result]
# header = ', '.join(result[0].keys())
# out_str = f"{header}\n{'-' * len(header)}\n" + '\n'.join(rows)
out_str = sql_result_to_table_str(result)
else:
if "create" in sql.lower():
out_str = "create table successfully."
elif "insert" in sql.lower():
out_str = "insert data successfully."
elif "delete" in sql.lower():
out_str = "delete data successfully."
elif "update" in sql.lower():
out_str = "update data successfully."
else:
out_str = "no results found."
return result, out_str
def select(self, table, columns="*", condition=None):
sql = f"SELECT {columns} FROM {table}"
if condition:
sql += f" WHERE {condition}"
return self.execute_sql(sql)
def insert(self, table, data):
keys = ','.join(data.keys())
values = ','.join([f"'{v}'" for v in data.values()])
sql = f"INSERT INTO {table} ({keys}) VALUES ({values})"
return self.execute_sql(sql)
def update(self, table, data, condition):
set_values = ','.join([f"{k}='{v}'" for k, v in data.items()])
sql = f"UPDATE {table} SET {set_values} WHERE {condition}"
return self.execute_sql(sql)
def delete(self, table, condition):
sql = f"DELETE FROM {table} WHERE {condition}"
return self.execute_sql(sql)
def create_database(self, database):
try:
self.execute_sql(f"DROP DATABASE `{database}`", raise_err=True)
except Exception as e:
pass
sql = f"CREATE DATABASE `{database}`"
self.execute_sql(sql, raise_err=True)
if self.database is None:
self.database = database
def drop_database(self, ):
assert self.database is not None
sql = f"DROP DATABASE `{self.database}`"
self.execute_sql(sql, raise_err=True)
self.database = None
# def __del__(self):
# if self.database is not None:
# self.drop_database()
if __name__ == '__main__':
from config import cfg
mysql = MySQLDB(host=cfg.mysql_host, user=cfg.mysql_user, password=cfg.mysql_password,
port=cfg.mysql_port, database="try2")