From e0f99e2346b536537f162579a7b1aa016188f420 Mon Sep 17 00:00:00 2001 From: yuenmichelle1 Date: Wed, 18 Oct 2023 11:36:48 -0500 Subject: [PATCH] Update backfill scripts for classifications (#42) * update classifications backfill script to chunk and save in file * update to <= remove unused limit in query * update copy into source * split backfill to file creation then copy from files * cast to int * revert accidental adding commas on limit * add keepalives to hopefully ensure connection does not get lost --- scripts/copy_classifications_from_files.py | 2 +- scripts/save_classifications_chunk_in_files.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/scripts/copy_classifications_from_files.py b/scripts/copy_classifications_from_files.py index 06e68b6..1a32079 100644 --- a/scripts/copy_classifications_from_files.py +++ b/scripts/copy_classifications_from_files.py @@ -18,7 +18,7 @@ output_file_no = 0 while output_file_no <= num_files: - with psycopg.connect(f"host={TIMESCALE_CONNECTION} port={TIMESCALE_PORT} dbname={ERAS_DB} user={ERAS_USER} password={ERAS_PW} sslmode=require") as timescale_db_conn: + with psycopg.connect(f"host={TIMESCALE_CONNECTION} port={TIMESCALE_PORT} dbname={ERAS_DB} user={ERAS_USER} password={ERAS_PW} sslmode=require keepalives=1 keepalives_idle=30 keepalives_interval=10 keepalives_count=20") as timescale_db_conn: with timescale_db_conn.cursor(name="timescale_cursor").copy("COPY classification_events FROM STDIN DELIMITER ',' CSV HEADER") as timescale_copy: timescale_copy.write(open(f"prod_classifications_{output_file_no}.csv").read()) output_file_no += 1 diff --git a/scripts/save_classifications_chunk_in_files.py b/scripts/save_classifications_chunk_in_files.py index 1ee7c81..72adc66 100644 --- a/scripts/save_classifications_chunk_in_files.py +++ b/scripts/save_classifications_chunk_in_files.py @@ -27,7 +27,7 @@ else: query = f"{classifications_query} limit {limit} offset {limit * offset}" - with psycopg.connect(f"host={PANOPTES_CONN} port={PANOPTES_PORT} dbname={PANOPTES_DB} user={PANOPTES_USER} password={PANOPTES_PW} sslmode=require") as panoptes_db_conn: + with psycopg.connect(f"host={PANOPTES_CONN} port={PANOPTES_PORT} dbname={PANOPTES_DB} user={PANOPTES_USER} password={PANOPTES_PW} sslmode=require keepalives=1 keepalives_idle=30 keepalives_interval=10 keepalives_count=20") as panoptes_db_conn: with open(f"prod_classifications_{offset}.csv", "wb") as f: with panoptes_db_conn.cursor(name="panoptes_cursor").copy(f"COPY ({classifications_query}) TO STDOUT WITH CSV HEADER", (FIRST_INGESTED_CLASSIFICATION_ID,)) as panoptes_copy: for data in panoptes_copy: