From f8fa8e312aea56368e6db8727dfb72ec81492623 Mon Sep 17 00:00:00 2001 From: yuenmichelle1 Date: Tue, 17 Oct 2023 14:07:55 -0500 Subject: [PATCH] split backfill to file creation then copy from files --- scripts/copy_classifications_from_files.py | 27 +++++++++++++++++++ ...=> save_classifications_chunk_in_files.py} | 16 ----------- 2 files changed, 27 insertions(+), 16 deletions(-) create mode 100644 scripts/copy_classifications_from_files.py rename scripts/{backfill_classifications_chunk_in_files.py => save_classifications_chunk_in_files.py} (76%) diff --git a/scripts/copy_classifications_from_files.py b/scripts/copy_classifications_from_files.py new file mode 100644 index 0000000..46818bb --- /dev/null +++ b/scripts/copy_classifications_from_files.py @@ -0,0 +1,27 @@ +import os +import psycopg +from datetime import datetime +import math + +TIMESCALE_CONNECTION = os.getenv('TIMESCALE_CONNECTION') +TIMESCALE_PORT = os.getenv('TIMESCALE_PORT') +ERAS_DB = os.getenv('ERAS_DB') +ERAS_USER = os.getenv('ERAS_USER') +ERAS_PW = os.getenv('ERAS_PW') +FIRST_INGESTED_CLASSIFICATION_ID = os.getenv('FIRST_INGESTED_CLASSIFICATION_ID') + +limit = 10000000 +num_files = math.ceil(FIRST_INGESTED_CLASSIFICATION_ID/limit) + +start_time = datetime.now() +print("TIMESCALE START COPY FROM CSV BEFORE TIME =", start_time) + +output_file_no = 0 +while output_file_no <= num_files: + with psycopg.connect(f"host={TIMESCALE_CONNECTION} port={TIMESCALE_PORT} dbname={ERAS_DB} user={ERAS_USER} password={ERAS_PW} sslmode=require") as timescale_db_conn: + with timescale_db_conn.cursor(name="timescale_cursor").copy("COPY classification_events FROM STDIN DELIMITER ',' CSV HEADER") as timescale_copy: + timescale_copy.write(open(f"prod_classifications_{output_file_no}.csv").read()) + output_file_no += 1 + +finish_time = datetime.now() +print("CLASSIFICATIONS TIMESCALE backfill AFTER Time =", finish_time) \ No newline at end of file diff --git a/scripts/backfill_classifications_chunk_in_files.py b/scripts/save_classifications_chunk_in_files.py similarity index 76% rename from scripts/backfill_classifications_chunk_in_files.py rename to scripts/save_classifications_chunk_in_files.py index 2458572..575cd25 100644 --- a/scripts/backfill_classifications_chunk_in_files.py +++ b/scripts/save_classifications_chunk_in_files.py @@ -8,11 +8,6 @@ PANOPTES_DB = os.getenv('PANOPTES_DB') PANOPTES_USER = os.getenv('PANOPTES_USER') PANOPTES_PW = os.getenv('PANOPTES_PW') -TIMESCALE_CONNECTION = os.getenv('TIMESCALE_CONNECTION') -TIMESCALE_PORT = os.getenv('TIMESCALE_PORT') -ERAS_DB = os.getenv('ERAS_DB') -ERAS_USER = os.getenv('ERAS_USER') -ERAS_PW = os.getenv('ERAS_PW') FIRST_INGESTED_CLASSIFICATION_ID = os.getenv('FIRST_INGESTED_CLASSIFICATION_ID') start_time = datetime.now() @@ -41,16 +36,5 @@ finish_copy_time = datetime.now() print("PANOPTES Classification Copy over at ", finish_copy_time) -print("TIMESCALE START COPY FROM CSV") - -output_file_no = 0 -while output_file_no <= num_files: - with psycopg.connect(f"host={TIMESCALE_CONNECTION} port={TIMESCALE_PORT} dbname={ERAS_DB} user={ERAS_USER} password={ERAS_PW} sslmode=require") as timescale_db_conn: - with timescale_db_conn.cursor(name="timescale_cursor").copy("COPY classification_events FROM STDIN DELIMITER ',' CSV HEADER") as timescale_copy: - timescale_copy.write(open(f"prod_classifications_{output_file_no}.csv").read()) - output_file_no += 1 - -finish_time = datetime.now() -print("CLASSIFICATIONS TIMESCALE backfill AFTER Time =", finish_time)