From 35d32d2e6fb8ae95ee42256983819d732f015b39 Mon Sep 17 00:00:00 2001 From: Aidan Lawford-Wickham Date: Mon, 20 May 2019 16:30:35 -0400 Subject: [PATCH] Allows multiple batch insertions in one loop The following changes to offset insertion script: - Adds a loop to complete multiple batch insertions (the whole table) in a single execution of the script - Fixes indentation errors (convert to spaces) - Improves debugging outputs with item and batch counts --- submission_offsets.py | 120 ++++++++++++++++++++++++------------------ 1 file changed, 70 insertions(+), 50 deletions(-) diff --git a/submission_offsets.py b/submission_offsets.py index 70254686a..0e2c9186c 100644 --- a/submission_offsets.py +++ b/submission_offsets.py @@ -12,57 +12,77 @@ @cli.command(name='add-offsets') @click.option("--limit", "-l", default=10000) def add_offsets(limit): - """Update lowlevel submission offsets with a specified limit.""" - incremental_add_offset(limit) + """Update lowlevel submission offsets with a specified limit.""" + incremental_add_offset(limit) def incremental_add_offset(limit): - with db.engine.connect() as connection: - # Find the next batch of items to update - batch_query = text(""" - SELECT id, gid - FROM lowlevel - WHERE submission_offset IS NULL - LIMIT :limit - """) - batch_result = connection.execute(batch_query, { "limit": limit }) - - # Find max existing offsets - offset_query = text(""" - SELECT gid, MAX(submission_offset) - FROM lowlevel - WHERE submission_offset IS NOT NULL - GROUP BY gid - """) - offset_result = connection.execute(offset_query) - - max_offsets = defaultdict(int) - for gid, max_offset in offset_result.fetchall(): - max_offsets[gid] = max_offset - - print("Starting batch insertions...") - print("============================") - count = 0 - for id, gid in batch_result.fetchall(): - print("Finished {}/{} items".format(count, limit)) - print("Current lowlevel.id: {}".format(id)) - print("Inserting...") - if gid in max_offsets: - # Current offset exists - max_offsets[gid] += 1 - else: - # No existing offset - max_offsets[gid] = 0 - offset = max_offsets[gid] - - query = text(""" - UPDATE lowlevel - SET submission_offset = :offset - WHERE id = :id - """) - connection.execute(query, { "id": id, "offset": offset }) - count += 1 - - print("============================") - print("Batch insertions finished.") + with db.engine.connect() as connection: + + # Find number of items in table + size_query = text(""" + SELECT MAX(id) AS size + FROM lowlevel + """) + size_result = connection.execute(size_query) + table_size = size_result.fetchone()["size"] + + # Find max existing offsets + offset_query = text(""" + SELECT gid, MAX(submission_offset) + FROM lowlevel + WHERE submission_offset IS NOT NULL + GROUP BY gid + """) + offset_result = connection.execute(offset_query) + + max_offsets = defaultdict(int) + for gid, max_offset in offset_result.fetchall(): + max_offsets[gid] = max_offset + + # Find the next batch of items to update + batch_query = text(""" + SELECT id, gid + FROM lowlevel + WHERE submission_offset IS NULL + ORDER BY id + LIMIT :limit + """) + + batch_count = 0 + item_count = 0 + print("Starting batch insertions...") + print("============================") + while True: + batch_result = connection.execute(batch_query, { "limit": limit }) + if not batch_result.rowcount: + print("\nSubmission offset exists for all items. Exiting...") + break + + batch_count += 1 + print("\nUpdating batch {}:".format(batch_count)) + with connection.begin() as transaction: + for id, gid in batch_result.fetchall(): + if gid in max_offsets: + # Current offset exists + max_offsets[gid] += 1 + else: + # No existing offset + max_offsets[gid] = 0 + offset = max_offsets[gid] + + query = text(""" + UPDATE lowlevel + SET submission_offset = :offset + WHERE id = :id + """) + connection.execute(query, { "id": id, "offset": offset }) + item_count += 1 + print("\r Inserted {}/{} items...".format(item_count, table_size)), + print("") + + + print("============================") + print("Batch insertions finished.") +