Skip to content

Commit

Permalink
Allows multiple batch insertions in one loop
Browse files Browse the repository at this point in the history
The following changes to offset insertion script:

- Adds a loop to complete multiple batch insertions (the whole
  table) in a single execution of the script
- Fixes indentation errors (convert to spaces)
- Improves debugging outputs with item and batch counts
  • Loading branch information
aidanlw17 committed May 20, 2019
1 parent c05e347 commit 35d32d2
Showing 1 changed file with 70 additions and 50 deletions.
120 changes: 70 additions & 50 deletions submission_offsets.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,57 +12,77 @@
@cli.command(name='add-offsets')
@click.option("--limit", "-l", default=10000)
def add_offsets(limit):
"""Update lowlevel submission offsets with a specified limit."""
incremental_add_offset(limit)
"""Update lowlevel submission offsets with a specified limit."""
incremental_add_offset(limit)

def incremental_add_offset(limit):
with db.engine.connect() as connection:
# Find the next batch of items to update
batch_query = text("""
SELECT id, gid
FROM lowlevel
WHERE submission_offset IS NULL
LIMIT :limit
""")
batch_result = connection.execute(batch_query, { "limit": limit })

# Find max existing offsets
offset_query = text("""
SELECT gid, MAX(submission_offset)
FROM lowlevel
WHERE submission_offset IS NOT NULL
GROUP BY gid
""")
offset_result = connection.execute(offset_query)

max_offsets = defaultdict(int)
for gid, max_offset in offset_result.fetchall():
max_offsets[gid] = max_offset

print("Starting batch insertions...")
print("============================")
count = 0
for id, gid in batch_result.fetchall():
print("Finished {}/{} items".format(count, limit))
print("Current lowlevel.id: {}".format(id))
print("Inserting...")
if gid in max_offsets:
# Current offset exists
max_offsets[gid] += 1
else:
# No existing offset
max_offsets[gid] = 0
offset = max_offsets[gid]

query = text("""
UPDATE lowlevel
SET submission_offset = :offset
WHERE id = :id
""")
connection.execute(query, { "id": id, "offset": offset })
count += 1

print("============================")
print("Batch insertions finished.")
with db.engine.connect() as connection:

# Find number of items in table
size_query = text("""
SELECT MAX(id) AS size
FROM lowlevel
""")
size_result = connection.execute(size_query)
table_size = size_result.fetchone()["size"]

# Find max existing offsets
offset_query = text("""
SELECT gid, MAX(submission_offset)
FROM lowlevel
WHERE submission_offset IS NOT NULL
GROUP BY gid
""")
offset_result = connection.execute(offset_query)

max_offsets = defaultdict(int)
for gid, max_offset in offset_result.fetchall():
max_offsets[gid] = max_offset

# Find the next batch of items to update
batch_query = text("""
SELECT id, gid
FROM lowlevel
WHERE submission_offset IS NULL
ORDER BY id
LIMIT :limit
""")

batch_count = 0
item_count = 0
print("Starting batch insertions...")
print("============================")
while True:
batch_result = connection.execute(batch_query, { "limit": limit })
if not batch_result.rowcount:
print("\nSubmission offset exists for all items. Exiting...")
break

batch_count += 1
print("\nUpdating batch {}:".format(batch_count))
with connection.begin() as transaction:
for id, gid in batch_result.fetchall():
if gid in max_offsets:
# Current offset exists
max_offsets[gid] += 1
else:
# No existing offset
max_offsets[gid] = 0
offset = max_offsets[gid]

query = text("""
UPDATE lowlevel
SET submission_offset = :offset
WHERE id = :id
""")
connection.execute(query, { "id": id, "offset": offset })
item_count += 1
print("\r Inserted {}/{} items...".format(item_count, table_size)),
print("")


print("============================")
print("Batch insertions finished.")



0 comments on commit 35d32d2

Please sign in to comment.