-
Notifications
You must be signed in to change notification settings - Fork 2
/
test-rds-mysql.py
131 lines (121 loc) · 4.65 KB
/
test-rds-mysql.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
import requests
import json
import time
import logging
import random
import pandas as pd
from pyservicebinding import binding
import mysql.connector
#
response = requests.get('http://localhost:8080')
jresponse = response.json()
if jresponse['status'] != "DB binding ok":
print(jresponse['status'])
exit(1)
sb = binding.ServiceBinding()
print(sb.all_bindings())
bindings_list = sb.bindings('mysql', 'OpenShift Database Access / Amazon Relational Database Service (RDS)')
print(bindings_list[0])
db_connection = mysql.connector.connect(database=bindings_list[0].get('database'), \
user=bindings_list[0].get('username'), \
password=bindings_list[0].get('password'), \
#sslmode=bindings_list[0].get('sslmode'), \
host=bindings_list[0].get('host'), \
#options=bindings_list[0].get('options'), \
port=bindings_list[0].get('port'))
print(db_connection)
#
def create_accounts(conn):
with conn.cursor() as cur:
cur.execute('CREATE TABLE accounts (id INT PRIMARY KEY, balance INT)')
cur.execute('INSERT INTO accounts (id, balance) VALUES (1, 1000), (2, 250)')
# logging.debug("create_accounts(): status message: {}".format(cur.statusmessage))
conn.commit()
#
def delete_table(conn):
with conn.cursor() as cur:
cur.execute("DROP TABLE IF EXISTS accounts")
# logging.debug("delete_accounts(): status message: {}".format(cur.statusmessage))
conn.commit()
#
def query(conn):
with conn.cursor() as cur:
cur.execute("SELECT id, balance FROM accounts")
# logging.debug("print_balances(): status message: {}".format(cur.statusmessage))
rows = cur.fetchall()
conn.commit()
print("Balances at {}".format(time.asctime()))
print("Balances at {}".format(time.asctime()))
for row in rows:
print([str(cell) for cell in row])
# #
# def run_transaction(conn, op):
# retries = 0
# max_retries = 3
# with conn:
# while True:
# retries +=1
# if retries == max_retries:
# err_msg = "Transaction did not succeed after {} retries".format(max_retries)
# raise ValueError(err_msg)
#
# try:
# op(conn)
#
# # If we reach this point, we were able to commit, so we break
# # from the retry loop.
# break
# except mysql.connector.Error as e:
# logging.debug("e.pgcode: {}".format(e.pgcode))
# if e.pgcode == '40001':
# # This is a retry error, so we roll back the current
# # transaction and sleep for a bit before retrying. The
# # sleep time increases for each failed transaction.
# conn.rollback()
# logging.debug("EXECUTE SERIALIZATION_FAILURE BRANCH")
# sleep_ms = (2**retries) * 0.1 * (random.random() + 0.5)
# logging.debug("Sleeping {} seconds".format(sleep_ms))
# time.sleep(sleep_ms)
# continue
# else:
# logging.debug("EXECUTE NON-SERIALIZATION_FAILURE BRANCH")
# raise e
#
# #
def transfer_funds(conn, frm, to, amount):
with conn.cursor() as cur:
# Check the current balance.
cur.execute("SELECT balance FROM accounts WHERE id = " + str(frm))
from_balance = cur.fetchone()[0]
if from_balance < amount:
err_msg = "Insufficient funds in account {}: have {}, need {}".format(frm, from_balance, amount)
raise RuntimeError(err_msg)
# Perform the transfer.
cur.execute("UPDATE accounts SET balance = balance - %s WHERE id = %s",
(amount, frm))
cur.execute("UPDATE accounts SET balance = balance + %s WHERE id = %s",
(amount, to))
conn.commit()
# logging.debug("transfer_funds(): status message: {}".format(cur.statusmessage))
delete_table(db_connection)
create_accounts(db_connection)
query(db_connection)
amount = 100
fromId = 1
toId = 2
try:
transfer_funds(db_connection, fromId, toId, amount)
except ValueError as ve:
logging.debug("run_transaction(db_connection, op) failed: {}".format(ve))
pass
query(db_connection)
#
def final_verification(conn):
df_ref = pd.read_csv('validate.csv')
cur = conn.cursor()
cur.execute("SELECT * FROM accounts")
#logging.debug("select_all(): status message: {}".format(cur.statusmessage))
df = pd.DataFrame(cur.fetchall(), columns=['id', 'balance'])
return df.equals(df_ref)
print("******* Validation is: ", final_verification(db_connection))
delete_table(db_connection)