From 9cedd9edbb457ee603c2ba2e997d85c182039e8a Mon Sep 17 00:00:00 2001 From: yuenmichelle1 Date: Thu, 21 Sep 2023 11:06:29 -0500 Subject: [PATCH] Adding backfill scripts (#31) * Create backfill_talk_comments.py * backfill classifications * Update backfill_classifications.py * add user_group_membership_backfill and update select query of backfilling talk comments and classifications * add update classification events --- scripts/backfill_classifications.py | 29 ++++++++ scripts/backfill_talk_comments.py | 21 ++++++ ...roup_membership_classification_backfill.py | 72 +++++++++++++++++++ 3 files changed, 122 insertions(+) create mode 100644 scripts/backfill_classifications.py create mode 100644 scripts/backfill_talk_comments.py create mode 100644 scripts/user_group_membership_classification_backfill.py diff --git a/scripts/backfill_classifications.py b/scripts/backfill_classifications.py new file mode 100644 index 0000000..53a2ca7 --- /dev/null +++ b/scripts/backfill_classifications.py @@ -0,0 +1,29 @@ +import os +import psycopg +from datetime import datetime + +PANOPTES_CONN = os.getenv('PANOPTES_CONN') +PANOPTES_PORT = os.getenv('PANOPTES_PORT') +PANOPTES_DB = os.getenv('PANOPTES_DB') +PANOPTES_USER = os.getenv('PANOPTES_USER') +PANOPTES_PW = os.getenv('PANOPTES_PW') +TIMESCALE_CONNECTION = os.getenv('TIMESCALE_CONNECTION') +TIMESCALE_PORT = os.getenv('TIMESCALE_PORT') +ERAS_DB = os.getenv('ERAS_DB') +ERAS_USER = os.getenv('ERAS_USER') +ERAS_PW = os.getenv('ERAS_PW') +FIRST_INGESTED_CLASSIFICATION_ID = os.getenv('FIRST_INGESTED_CLASSIFICATION_ID') + +now = datetime.now() +current_time = now.strftime("%H:%M:%S") +print("CLASSIFICATIONS backfill BEFORE Time =", current_time) + +with psycopg.connect(f"host={PANOPTES_CONN} port={PANOPTES_PORT} dbname={PANOPTES_DB} user={PANOPTES_USER} password={PANOPTES_PW}") as panoptes_db_conn, psycopg.connect(f"host={TIMESCALE_CONNECTION} port={TIMESCALE_PORT} dbname={ERAS_DB} user={ERAS_USER} password={ERAS_PW}") as timescale_db_conn: + with panoptes_db_conn.cursor(name="panoptes_cursor").copy("COPY (select id as classification_id, created_at as event_time, updated_at as classification_updated_at, TO_TIMESTAMP(metadata ->> 'started_at', 'YYYY-MM-DD HH24:MI:SS') as started_at, TO_TIMESTAMP(metadata ->> 'finished_at', 'YYYY-MM-DD HH24:MI:SS') as finished_at, project_id, workflow_id, user_id, string_to_array(replace(replace(replace(metadata ->> 'user_group_ids', '[', ''), ']', ''), ' ', '' ), ',')::int[] as user_group_ids, EXTRACT(EPOCH FROM TO_TIMESTAMP(metadata ->> 'finished_at', 'YYYY-MM-DD HH24:MI:SS') - TO_TIMESTAMP(metadata ->> 'started_at', 'YYYY-MM-DD HH24:MI:SS')) as session_time, created_at, updated_at from classifications where id < %s) TO STDOUT (FORMAT BINARY)", (FIRST_INGESTED_CLASSIFICATION_ID,)) as panoptes_copy: + with timescale_db_conn.cursor().copy("COPY classification_events FROM STDIN (FORMAT BINARY)") as timescale_copy: + for data in panoptes_copy: + timescale_copy.write(data) + + finish = datetime.now() + finish_time = finish.strftime("%H:%M:%S") + print("CLASSIFICATIONS backfill AFTER Time =", finish_time) \ No newline at end of file diff --git a/scripts/backfill_talk_comments.py b/scripts/backfill_talk_comments.py new file mode 100644 index 0000000..2cfb791 --- /dev/null +++ b/scripts/backfill_talk_comments.py @@ -0,0 +1,21 @@ +import os +import psycopg + +TALK_CONN = os.getenv('TALK_CONNECTION') +TALK_PORT = os.getenv('TALK_PORT') +TALK_DB = os.getenv('TALK_DB') +TALK_USER = os.getenv('TALK_USER') +TALK_PW = os.getenv('TALK_PW') +TIMESCALE_CONNECTION = os.getenv('TIMESCALE_CONNECTION') +TIMESCALE_PORT = os.getenv('TIMESCALE_PORT') +ERAS_DB = os.getenv('ERAS_DB') +ERAS_USER = os.getenv('ERAS_USER') +ERAS_PW = os.getenv('ERAS_PW') +FIRST_INGESTED_COMMENT_ID = os.getenv('FIRST_COMMENT_ID') + + +with psycopg.connect(f"host={TALK_CONN} port={TALK_PORT} dbname={TALK_DB} user={TALK_USER} password={TALK_PW}") as talk_db_conn, psycopg.connect(f"host={TIMESCALE_CONNECTION} port={TIMESCALE_PORT} dbname={ERAS_DB} user={ERAS_USER} password={ERAS_PW}") as timescale_db_conn: + with talk_db_conn.cursor(name="talk").copy("COPY (SELECT id as comment_id, created_at as event_time, updated_at as comment_updated_at, project_id, user_id, created_at, updated_at from comments where id < %s}) TO STDOUT (FORMAT BINARY)", (FIRST_INGESTED_COMMENT_ID,)) as talk_copy: + with timescale_db_conn.cursor().copy("COPY comment_events FROM STDIN (FORMAT BINARY)") as timescale_copy: + for data in talk_copy: + timescale_copy.write(data) \ No newline at end of file diff --git a/scripts/user_group_membership_classification_backfill.py b/scripts/user_group_membership_classification_backfill.py new file mode 100644 index 0000000..c168474 --- /dev/null +++ b/scripts/user_group_membership_classification_backfill.py @@ -0,0 +1,72 @@ +import os +import argparse +import psycopg +from datetime import datetime + +PANOPTES_CONN = os.getenv('PANOPTES_CONN') +PANOPTES_PORT = os.getenv('PANOPTES_PORT') +PANOPTES_DB = os.getenv('PANOPTES_DB') +PANOPTES_USER = os.getenv('PANOPTES_USER') +PANOPTES_PW = os.getenv('PANOPTES_PW') +TIMESCALE_CONNECTION = os.getenv('TIMESCALE_CONNECTION') +TIMESCALE_PORT = os.getenv('TIMESCALE_PORT') +ERAS_DB = os.getenv('ERAS_DB') +ERAS_USER = os.getenv('ERAS_USER') +ERAS_PW = os.getenv('ERAS_PW') + +now = datetime.now() + +current_time = now.strftime("%H:%M:%S") +print("BEFORE Time =", current_time) + +parser = argparse.ArgumentParser() +parser.add_argument("-ug", "--user_group_id", type=int) +parser.add_argument('email_domain_formats') + +args = parser.parse_args() +user_group_id = args.user_group_id +# email formats in form of comma separated string with no spaces (eg. "%a.com,%b.org%") +email_formats = args.email_domain_formats + +panoptes_db_conn = psycopg.connect(f"host={PANOPTES_CONN} port={PANOPTES_PORT} dbname={PANOPTES_DB} user={PANOPTES_USER} password={PANOPTES_PW}") +panoptes_cursor = panoptes_db_conn.cursor() + +eras_conn = psycopg.connect(f"host={TIMESCALE_CONNECTION} port={TIMESCALE_PORT} dbname={ERAS_DB} user={ERAS_USER} password={ERAS_PW}") +eras_cursor = eras_conn.cursor() + +# get ids of users that are not in group yet +panoptes_cursor.execute("SELECT id from users where email ILIKE ANY(STRING_TO_ARRAY(%s, ',')) AND id NOT IN (SELECT user_id from memberships where user_group_id=%s)", (email_formats, user_group_id)) + +not_in_group_yet_user_ids = [result[0] for result in panoptes_cursor.fetchall()] + +if len(not_in_group_yet_user_ids) > 0: + # create memberships to user group + memberships_to_create = list(map(lambda user_id: (user_group_id, user_id, 'active', '{"group_member"}'),not_in_group_yet_user_ids)) + panoptes_cursor.executemany("INSERT INTO memberships (user_group_id, user_id, state, roles) VALUES (%s,%s,%s,%s)", memberships_to_create) + + panoptes_db_conn.commit() + + # eras get classification_events of not_in_group_yet_user_ids that does not have user_group_id within their user_group_ids classification_event + eras_cursor.execute("SELECT classification_id, event_time, session_time, project_id, user_id, workflow_id, created_at, updated_at, user_group_ids from classification_events WHERE user_id = ANY(%s) AND %s!=ANY(user_group_ids)", (not_in_group_yet_user_ids, user_group_id)) + classification_events_to_backfill = eras_cursor.fetchall() + + # create classification_user_group + classification_user_groups = list(map(lambda classification: (classification[0:8] + (user_group_id,)), classification_events_to_backfill)) + eras_cursor.executemany("INSERT INTO classification_user_groups (classification_id, event_time, session_time, project_id, user_id, workflow_id, created_at, updated_at, user_group_id) VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s)", classification_user_groups) + + # update classification_events' user_group_ids so that it includes new classification_id + classification_events_to_update = list(map(lambda classification_event: {'classification_id': classification_event[0], 'user_group_ids': ([user_group_id] if classification_event[8] is None else classification_event[8] +[user_group_id])} ,classification_events_to_backfill)) + eras_cursor.executemany("UPDATE classification_events SET user_group_ids = %(user_group_ids)s WHERE classification_id = %(classification_id)s", classification_events_to_update) + + eras_conn.commit() + +panoptes_cursor.close() +panoptes_db_conn.close() +eras_cursor.close() +eras_conn.close() + + +finish = datetime.now() +finish_time = finish.strftime("%H:%M:%S") +print("AFTER Time =", finish_time) +