From c0ea72859eb55730bb7e351c36897635a92568f3 Mon Sep 17 00:00:00 2001 From: Jakub Kicinski Date: Fri, 7 Jun 2024 09:26:12 -0700 Subject: [PATCH] wip: SQL results Signed-off-by: Jakub Kicinski --- contest/backend/query.py | 57 ++++++++++++++++++++++++ contest/results-fetcher.py | 88 +++++++++++++++++++++++++++++++++++--- 2 files changed, 139 insertions(+), 6 deletions(-) diff --git a/contest/backend/query.py b/contest/backend/query.py index 9cf361a..5b5c99d 100644 --- a/contest/backend/query.py +++ b/contest/backend/query.py @@ -4,8 +4,10 @@ from flask import Flask from flask import request +import psycopg2 import couchdb import os +import re import datetime @@ -16,6 +18,10 @@ couch = couchdb.Server(f'http://{user}:{pwd}@127.0.0.1:5984') res_db = couch["results"] +db_name = os.getenv('DB_NAME') +psql = psycopg2.connect(database=db_name) +psql.autocommit = True + def branches_to_rows(br_cnt): data = res_db.view('branch/rows', None, @@ -63,3 +69,54 @@ def results(): print(f"Query for {br_cnt} branches, {need_rows} records took: {str(t3-t1)} ({str(t2-t1)}+{str(t3-t2)})") return data + + +def branches_to_rows2(br_cnt): + global psql + + cnt = 0 + with psql.cursor() as cur: + q = f"SELECT branch,count(*) FROM results GROUP BY branch ORDER BY branch DESC LIMIT {br_cnt}" + cur.execute(q) + for r in cur.fetchall(): + cnt += r[0] + return cnt + + +@app.route('/results2') +def results2(): + global psql + + br_name = request.args.get('branch-name') + if br_name: + if re.match(r'^[\w-_ ]+$', br_name) is None: + return {} + + t1 = datetime.datetime.now() + with psql.cursor() as cur: + cur.execute(f"SELECT info FROM results WHERE branch = '{br_name}' LIMIT 100") + rows = [json.loads(r[0]) for r in cur.fetchall()] + t2 = datetime.datetime.now() + print("Query for exact branch took: ", str(t2-t1)) + return rows + + t1 = datetime.datetime.now() + + br_cnt = request.args.get('branches') + try: + br_cnt = int(br_cnt) + except: + br_cnt = None + if not br_cnt: + br_cnt = 10 + + need_rows = branches_to_rows2(br_cnt) + t2 = datetime.datetime.now() + with psql.cursor() as cur: + cur.execute(f"SELECT info FROM results ORDER BY branch DESC LIMIT {need_rows}") + rows = [json.loads(r[0]) for r in cur.fetchall()] + + t3 = datetime.datetime.now() + print(f"Query for {br_cnt} branches, {need_rows} records took: {str(t3-t1)} ({str(t2-t1)}+{str(t3-t2)})") + + return rows diff --git a/contest/results-fetcher.py b/contest/results-fetcher.py index c3ef9a3..ddf5150 100755 --- a/contest/results-fetcher.py +++ b/contest/results-fetcher.py @@ -6,6 +6,7 @@ import datetime import json import os +import psycopg2 import requests import time import uuid @@ -23,8 +24,9 @@ url_pfx=relative/within/server combined=name-of-manifest.json [db] -results-name=db-name -branches-name=db-name +db=db-name +results-name=table-name +branches-name=table-name user=name pwd=pass """ @@ -38,11 +40,18 @@ def __init__(self): # "fetched" is more of a "need state rebuild" self.fetched = True + self.tbl_res = self.config.get("db", "results-name", fallback="results") + self.tbl_brn = self.config.get("db", "branches-name", fallback="branches") + user = self.config.get("db", "user") pwd = self.config.get("db", "pwd") server = couchdb.Server(f'http://{user}:{pwd}@127.0.0.1:5984') - self.res_db = server[self.config.get("db", "results-name", fallback="results")] - self.brn_db = server[self.config.get("db", "branches-name", fallback="branches")] + self.res_db = server[self.tbl_res] + self.brn_db = server[self.tbl_brn] + + db_name = self.config.get("db", "db") + self.psql_conn = psycopg2.connect(database=db_name) + self.psql_conn.autocommit = True def _one(self, rows): rows = list(rows) @@ -51,6 +60,15 @@ def _one(self, rows): return rows[0] def get_branch(self, name): + try: + with self.psql_conn.cursor() as cur: + cur.execute(f"SELECT info FROM {self.tbl_brn} WHERE branch = '{name}'") + rows = cur.fetchall() + return json.loads(rows[0][0]) + except Exception as e: + print("PSQL branch GET FAIL!") + print(e) + branch_info = self.brn_db.find({ 'selector': { 'branch': name @@ -58,6 +76,61 @@ def get_branch(self, name): }) return self._one(branch_info) + def psql_run_selector(self, cur, remote, run): + return cur.mogrify("WHERE branch = %s AND remote = %s AND executor = %s", + (run['branch'], remote["name"], run["executor"],)).decode('utf-8') + + def psql_has_wip(self, remote, run): + with self.psql_conn.cursor() as cur: + cur.execute(f"SELECT branch FROM {self.tbl_res} " + self.psql_run_selector(cur, remote, run)) + rows = cur.fetchall() + return len(rows) > 0 + + def insert_result_psql(self, cur, data): + arg = cur.mogrify("(%s,%s,%s,%s,%s,%s)", (data["branch"], data["remote"], data["executor"], + data["start"], data["end"], json.dumps(data))) + cur.execute(f"INSERT INTO {self.tbl_res} VALUES " + arg.decode('utf-8')) + + def insert_wip_psql(self, remote, run, branch_info): + try: + if self.psql_has_wip(remote, run): + # no point, we have no interesting info to add + return + + data = run.copy() + data["remote"] = remote["name"] + when = datetime.datetime.fromisoformat(branch_info['date']) + data["start"] = str(when) + when += datetime.timedelta(hours=2, minutes=58) + data["end"] = str(when) + data["results"] = None + + with self.psql_conn.cursor() as cur: + self.insert_result_psql(cur, data) + print("PSQL WIP save success!") + except Exception as e: + print("PSQL WIP save FAIL!") + print(e) + + def insert_real_psql(self, remote, run): + data = run.copy() + data["remote"] = remote["name"] + + try: + with self.psql_conn.cursor() as cur: + if self.psql_has_wip(remote, run): + self.insert_result_psql(cur, data) + else: + vals = cur.mogrify("SET t_start = %s, t_end = %s, unparsed = %s", + (data["start"], data["end"], json.dumps(data))).decode('utf-8') + selector = self.psql_run_selector(cur, remote, run) + q = f"UPDATE {self.tbl_res} SET " + vals + selector + cur.execute(q) + print("PSQL FULL save success!") + except Exception as e: + print("PSQL FULL save FAIL!") + print(e) + def get_wip_row(self, remote, run): rows = self.res_db.find({ 'selector': { @@ -71,10 +144,12 @@ def get_wip_row(self, remote, run): return row def insert_wip(self, remote, run): - existing = self.get_wip_row(remote, run) - branch_info = self.get_branch(run["branch"]) + self.insert_wip_psql(remote, run, branch_info) + + existing = self.get_wip_row(remote, run) + data = run.copy() if existing: data['_id'] = existing['_id'] @@ -91,6 +166,7 @@ def insert_wip(self, remote, run): self.res_db.save(data) def insert_real(self, remote, run): + self.insert_real_psql(remote, run) existing = self.get_wip_row(remote, run) data = run.copy()