From 2f8457a60bd58b740c85037cc196506fdca9b30b Mon Sep 17 00:00:00 2001 From: yuenmichelle1 Date: Tue, 17 Oct 2023 14:10:51 -0500 Subject: [PATCH] Update backfill scripts for classifications (#40) * update classifications backfill script to chunk and save in file * update to <= remove unused limit in query * update copy into source * split backfill to file creation then copy from files --- scripts/backfill_classifications.py | 18 +++++---- scripts/copy_classifications_from_files.py | 27 +++++++++++++ .../save_classifications_chunk_in_files.py | 40 +++++++++++++++++++ 3 files changed, 77 insertions(+), 8 deletions(-) create mode 100644 scripts/copy_classifications_from_files.py create mode 100644 scripts/save_classifications_chunk_in_files.py diff --git a/scripts/backfill_classifications.py b/scripts/backfill_classifications.py index e8a8342..a5b03fb 100644 --- a/scripts/backfill_classifications.py +++ b/scripts/backfill_classifications.py @@ -14,16 +14,18 @@ ERAS_PW = os.getenv('ERAS_PW') FIRST_INGESTED_CLASSIFICATION_ID = os.getenv('FIRST_INGESTED_CLASSIFICATION_ID') -now = datetime.now() -current_time = now.strftime("%H:%M:%S") -print("CLASSIFICATIONS backfill BEFORE Time =", current_time) +start_time = datetime.now() +print("CLASSIFICATIONS backfill BEFORE Time =", start_time) + +classifications_query = "select id::bigint as classification_id, created_at as event_time, updated_at as classification_updated_at, CASE WHEN metadata ->> 'started_at' ~'\d{1,2}\/\d{1,2}\/\d{2,4}' THEN to_timestamp(metadata ->> 'started_at', 'MM/DD/YYYY HH24:MI') ELSE TO_TIMESTAMP(metadata ->> 'started_at', 'YYYY-MM-DD HH24:MI:SS') END started_at, CASE WHEN metadata ->> 'finished_at' ~'\d{1,2}\/\d{1,2}\/\d{2,4}' THEN to_timestamp(metadata ->> 'finished_at', 'MM/DD/YYYY HH24:MI') ELSE TO_TIMESTAMP(metadata ->> 'finished_at', 'YYYY-MM-DD HH24:MI:SS') END finished_at, project_id::bigint, workflow_id::bigint, user_id::bigint, array_remove(string_to_array(replace(replace(replace(metadata ->> 'user_group_ids', '[', ''), ']', ''), ' ', '' ), ','), 'null')::bigint[] as user_group_ids, EXTRACT(EPOCH FROM (CASE WHEN metadata ->> 'finished_at' ~'\d{1,2}\/\d{1,2}\/\d{2,4}' THEN to_timestamp(metadata ->> 'finished_at', 'MM/DD/YYYY HH24:MI') ELSE TO_TIMESTAMP(metadata ->> 'finished_at', 'YYYY-MM-DD HH24:MI:SS') END) - (CASE WHEN metadata ->> 'started_at' ~'\d{1,2}\/\d{1,2}\/\d{2,4}' THEN to_timestamp(metadata ->> 'started_at', 'MM/DD/YYYY HH24:MI') ELSE TO_TIMESTAMP(metadata ->> 'started_at', 'YYYY-MM-DD HH24:MI:SS') END)) as session_time, created_at, updated_at from classifications where id < %s order by id desc limit 5000000" + with psycopg.connect(f"host={PANOPTES_CONN} port={PANOPTES_PORT} dbname={PANOPTES_DB} user={PANOPTES_USER} password={PANOPTES_PW} sslmode=require") as panoptes_db_conn, psycopg.connect(f"host={TIMESCALE_CONNECTION} port={TIMESCALE_PORT} dbname={ERAS_DB} user={ERAS_USER} password={ERAS_PW} sslmode=require") as timescale_db_conn: - with panoptes_db_conn.cursor(name="panoptes_cursor").copy("COPY (select id::bigint as classification_id, created_at as event_time, updated_at as classification_updated_at, CASE WHEN metadata ->> 'started_at' ~'\d{1,2}\/\d{1,2}\/\d{2,4}' THEN to_timestamp(metadata ->> 'started_at', 'MM/DD/YYYY HH24:MI') ELSE TO_TIMESTAMP(metadata ->> 'started_at', 'YYYY-MM-DD HH24:MI:SS') END started_at, CASE WHEN metadata ->> 'finished_at' ~'\d{1,2}\/\d{1,2}\/\d{2,4}' THEN to_timestamp(metadata ->> 'finished_at', 'MM/DD/YYYY HH24:MI') ELSE TO_TIMESTAMP(metadata ->> 'finished_at', 'YYYY-MM-DD HH24:MI:SS') END finished_at, project_id::bigint, workflow_id::bigint, user_id::bigint, string_to_array(replace(replace(replace(metadata ->> 'user_group_ids', '[', ''), ']', ''), ' ', '' ), ',')::bigint[] as user_group_ids, EXTRACT(EPOCH FROM (CASE WHEN metadata ->> 'finished_at' ~'\d{1,2}\/\d{1,2}\/\d{2,4}' THEN to_timestamp(metadata ->> 'finished_at', 'MM/DD/YYYY HH24:MI') ELSE TO_TIMESTAMP(metadata ->> 'finished_at', 'YYYY-MM-DD HH24:MI:SS') END) - (CASE WHEN metadata ->> 'started_at' ~'\d{1,2}\/\d{1,2}\/\d{2,4}' THEN to_timestamp(metadata ->> 'started_at', 'MM/DD/YYYY HH24:MI') ELSE TO_TIMESTAMP(metadata ->> 'started_at', 'YYYY-MM-DD HH24:MI:SS') END)) as session_time, created_at, updated_at from classifications where id < %s order by id) TO STDOUT (FORMAT BINARY)", (FIRST_INGESTED_CLASSIFICATION_ID,)) as panoptes_copy: - with timescale_db_conn.cursor().copy("COPY classification_events FROM STDIN (FORMAT BINARY)") as timescale_copy: + with panoptes_db_conn.cursor(name="panoptes_cursor").copy(f"COPY ({classifications_query}) TO STDOUT (FORMAT BINARY)", (FIRST_INGESTED_CLASSIFICATION_ID,)) as panoptes_copy: + with timescale_db_conn.cursor(name="timescale_cursor").copy("COPY classification_events FROM STDIN (FORMAT BINARY)") as timescale_copy: for data in panoptes_copy: timescale_copy.write(data) - finish = datetime.now() - finish_time = finish.strftime("%H:%M:%S") - print("CLASSIFICATIONS backfill AFTER Time =", finish_time) \ No newline at end of file + +finish_time = datetime.now() +print("CLASSIFICATIONS backfill AFTER Time =", finish_time) \ No newline at end of file diff --git a/scripts/copy_classifications_from_files.py b/scripts/copy_classifications_from_files.py new file mode 100644 index 0000000..46818bb --- /dev/null +++ b/scripts/copy_classifications_from_files.py @@ -0,0 +1,27 @@ +import os +import psycopg +from datetime import datetime +import math + +TIMESCALE_CONNECTION = os.getenv('TIMESCALE_CONNECTION') +TIMESCALE_PORT = os.getenv('TIMESCALE_PORT') +ERAS_DB = os.getenv('ERAS_DB') +ERAS_USER = os.getenv('ERAS_USER') +ERAS_PW = os.getenv('ERAS_PW') +FIRST_INGESTED_CLASSIFICATION_ID = os.getenv('FIRST_INGESTED_CLASSIFICATION_ID') + +limit = 10000000 +num_files = math.ceil(FIRST_INGESTED_CLASSIFICATION_ID/limit) + +start_time = datetime.now() +print("TIMESCALE START COPY FROM CSV BEFORE TIME =", start_time) + +output_file_no = 0 +while output_file_no <= num_files: + with psycopg.connect(f"host={TIMESCALE_CONNECTION} port={TIMESCALE_PORT} dbname={ERAS_DB} user={ERAS_USER} password={ERAS_PW} sslmode=require") as timescale_db_conn: + with timescale_db_conn.cursor(name="timescale_cursor").copy("COPY classification_events FROM STDIN DELIMITER ',' CSV HEADER") as timescale_copy: + timescale_copy.write(open(f"prod_classifications_{output_file_no}.csv").read()) + output_file_no += 1 + +finish_time = datetime.now() +print("CLASSIFICATIONS TIMESCALE backfill AFTER Time =", finish_time) \ No newline at end of file diff --git a/scripts/save_classifications_chunk_in_files.py b/scripts/save_classifications_chunk_in_files.py new file mode 100644 index 0000000..575cd25 --- /dev/null +++ b/scripts/save_classifications_chunk_in_files.py @@ -0,0 +1,40 @@ +import os +import psycopg +from datetime import datetime +import math + +PANOPTES_CONN = os.getenv('PANOPTES_CONN') +PANOPTES_PORT = os.getenv('PANOPTES_PORT') +PANOPTES_DB = os.getenv('PANOPTES_DB') +PANOPTES_USER = os.getenv('PANOPTES_USER') +PANOPTES_PW = os.getenv('PANOPTES_PW') +FIRST_INGESTED_CLASSIFICATION_ID = os.getenv('FIRST_INGESTED_CLASSIFICATION_ID') + +start_time = datetime.now() +print("CLASSIFICATIONS backfill BEFORE Time =", start_time) + +classifications_query = "select id::bigint as classification_id, created_at as event_time, updated_at as classification_updated_at, CASE WHEN metadata ->> 'started_at' ~'\d{1,2}\/\d{1,2}\/\d{2,4}' THEN to_timestamp(metadata ->> 'started_at', 'MM/DD/YYYY HH24:MI') ELSE TO_TIMESTAMP(metadata ->> 'started_at', 'YYYY-MM-DD HH24:MI:SS') END started_at, CASE WHEN metadata ->> 'finished_at' ~'\d{1,2}\/\d{1,2}\/\d{2,4}' THEN to_timestamp(metadata ->> 'finished_at', 'MM/DD/YYYY HH24:MI') ELSE TO_TIMESTAMP(metadata ->> 'finished_at', 'YYYY-MM-DD HH24:MI:SS') END finished_at, project_id::bigint, workflow_id::bigint, user_id::bigint, array_remove(string_to_array(replace(replace(replace(metadata ->> 'user_group_ids', '[', ''), ']', ''), ' ', '' ), ','), 'null')::bigint[] as user_group_ids, EXTRACT(EPOCH FROM (CASE WHEN metadata ->> 'finished_at' ~'\d{1,2}\/\d{1,2}\/\d{2,4}' THEN to_timestamp(metadata ->> 'finished_at', 'MM/DD/YYYY HH24:MI') ELSE TO_TIMESTAMP(metadata ->> 'finished_at', 'YYYY-MM-DD HH24:MI:SS') END) - (CASE WHEN metadata ->> 'started_at' ~'\d{1,2}\/\d{1,2}\/\d{2,4}' THEN to_timestamp(metadata ->> 'started_at', 'MM/DD/YYYY HH24:MI') ELSE TO_TIMESTAMP(metadata ->> 'started_at', 'YYYY-MM-DD HH24:MI:SS') END)) as session_time, created_at, updated_at from classifications where id < %s order by id desc" + +offset = 0 +limit = 10000000 +num_files = math.ceil(FIRST_INGESTED_CLASSIFICATION_ID/limit) + +while offset <= num_files: + query = '' + + if offset == 0: + query = f"{classifications_query} limit {limit}" + else: + query = f"{classifications_query} limit {limit} offset {limit * offset}" + + with psycopg.connect(f"host={PANOPTES_CONN} port={PANOPTES_PORT} dbname={PANOPTES_DB} user={PANOPTES_USER} password={PANOPTES_PW} sslmode=require") as panoptes_db_conn: + with open(f"prod_classifications_{offset}.csv", "wb") as f: + with panoptes_db_conn.cursor(name="panoptes_cursor").copy(f"COPY ({classifications_query}) TO STDOUT WITH CSV HEADER", (FIRST_INGESTED_CLASSIFICATION_ID,)) as panoptes_copy: + for data in panoptes_copy: + f.write(data) + offset+= 1 + +finish_copy_time = datetime.now() +print("PANOPTES Classification Copy over at ", finish_copy_time) + +