-
Notifications
You must be signed in to change notification settings - Fork 2
/
test-rds-oracle.py
130 lines (103 loc) · 3.7 KB
/
test-rds-oracle.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
import requests
import time
import logging
import pandas as pd
from pyservicebinding import binding
import oracledb
def create_accounts(conn):
create_accounts_table_query = """
CREATE TABLE accounts
( id number NOT NULL,
balance number NOT NULL,
PRIMARY KEY(id)
)
"""
insert_into_accounts_query = """
INSERT ALL
INTO accounts(id, balance) VALUES (1, 1000)
INTO accounts(id, balance) VALUES (2, 250)
SELECT * FROM DUAL
"""
with conn.cursor() as cur:
cur.execute(create_accounts_table_query)
cur.execute(insert_into_accounts_query)
conn.commit()
def delete_table(conn):
delete_table_query = """
DECLARE cnt NUMBER;
BEGIN
SELECT COUNT(*) INTO cnt FROM user_tables WHERE table_name = 'ACCOUNTS';
IF cnt <> 0 THEN
EXECUTE IMMEDIATE 'DROP TABLE accounts';
END IF;
END;
"""
with conn.cursor() as cur:
cur.execute(delete_table_query)
conn.commit()
def query(conn):
with conn.cursor() as cur:
cur.execute("SELECT id, balance FROM accounts")
rows = cur.fetchall()
conn.commit()
print("Balances at {}".format(time.asctime()))
for row in rows:
print([str(cell) for cell in row])
def transfer_funds(conn, frm, to, amount):
with conn.cursor() as cur:
# Check the current balance.
cur.execute("SELECT balance FROM accounts WHERE id = " + str(frm))
from_balance = cur.fetchone()[0]
if from_balance < amount:
err_msg = "Insufficient funds in account {}: have {}, need {}".format(frm, from_balance, amount)
raise RuntimeError(err_msg)
# Perform the transfer.
cur.execute(f"UPDATE accounts SET balance = balance - {amount} WHERE id = {frm}")
cur.execute(f"UPDATE accounts SET balance = balance + {amount} WHERE id = {to}")
conn.commit()
def final_verification(conn):
df_ref = pd.read_csv('validate.csv')
cur = conn.cursor()
cur.execute("SELECT * FROM accounts")
#There is no something like statusmessage in oracledb library
# logging.debug("select_all(): status message: {}".format(cur.statusmessage))
cursor_fetch = cur.fetchall()
logging.debug(f"select_all(): fetch: {cursor_fetch}")
df = pd.DataFrame(cursor_fetch, columns = ['id','balance'])
return df.equals(df_ref)
def main():
# RETRIEVE DDBB INFO
response = requests.get('http://localhost:8080')
jresponse = response.json()
if jresponse['status'] != "DB binding ok":
print(jresponse['status'])
exit(1)
sb = binding.ServiceBinding()
print(sb.all_bindings())
#bindings_list = sb.bindings('oracle', 'Red Hat DBaaS / Amazon Relational Database Service (RDS)')
bindings_list = sb.bindings('oracle', 'OpenShift Database Access / Amazon Relational Database Service (RDS)')
# CONNECT TO ORACLE
dsn = f'{bindings_list[0]["username"]}/{bindings_list[0]["password"]}@{bindings_list[0]["host"]}:{bindings_list[0]["port"]}/{bindings_list[0]["database"]}'
connection = oracledb.connect(dsn)
print(connection)
# PROVISION TABLE
delete_table(connection)
create_accounts(connection)
# GET ACCOUNTS BALANCE
query(connection)
# TRANSFER BETWEEN ACCOUNTS
amount = 100
fromId = 1
toId = 2
try:
transfer_funds(connection, fromId, toId, amount)
except ValueError as ve:
logging.debug("run_transaction(connection, op) failed: {}".format(ve))
pass
# GET ACCOUNTS BALANCE
query(connection)
# VALIDATE TRANSFERS
print("******* Validation is: ", final_verification(connection))
delete_table(connection)
if __name__=="__main__":
main()