Skip to content

Commit

Permalink
Draft commit -> Fix full export batch size limit + Trying to fix incr…
Browse files Browse the repository at this point in the history
…emental

But 1st batch has 501 entries.
2nd has 500, 3rd has 500.
4th has 405 entries.

Understood the issue.

Right now we are segregating based on time ranges as well as batch sizes.
For incremental export, both are in play and right now, logic is getting messed up.

For full export, mainly batch size is in play as end_ts would initially be set to current time.
But if batch size exceeds limit, then we are setting end_ts to current batch size’s last entry.

Now, while the run_purge_data_pipeline() is able to stop at batch size, the existing export() script is unable to do so.
The export script just checks for the timestamps and exports everything in that range.
Similarly, the delete function also doesn’t care about the batch size and just deletes all matching entries within the time range.

A simple fix could be to try and limit the entries exported and deleted.

For export, just returning 500 entries for now in export script. This works.

For delete, there is no limit flag.
Can try deleting only matching IDs

-------

Trying to solve for incremental export.

But realized that we might not need the batch size at all.
The batch_size default in load_multi_timeline_for_range isn't a fixed cutoff that it'll only process the limited data. It just separates the data into batches in the script itself.

No need to handle in the purge export script.

----------

Also, can simplify delete function in purge.

-------

New test added for batch size

------

Just committing code here for reference.
  • Loading branch information
Mahadik, Mukul Chandrakant authored and Mahadik, Mukul Chandrakant committed Aug 31, 2024
1 parent 34ab73d commit 4ab627b
Show file tree
Hide file tree
Showing 5 changed files with 161 additions and 5 deletions.
3 changes: 2 additions & 1 deletion emission/purge_restore/export_timeseries.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@ def get_with_retry(retrieve_call, in_query):
done = True
else:
query.startTs = curr_batch[-1][timeTypeSplit[0]][timeTypeSplit[1]]
return list_so_far

return list_so_far[:501]

def get_from_all_three_sources_with_retry(user_id, in_query, databases=None):
import emission.storage.timeseries.builtin_timeseries as estb
Expand Down
2 changes: 2 additions & 0 deletions emission/purge_restore/import_timeseries.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ def get_load_ranges(entries, batch_size):
start_indices = list(range(0, len(entries), batch_size))
ranges = list(zip(start_indices, start_indices[1:]))
ranges.append((start_indices[-1], len(entries)))
print("Start indices are %s" % start_indices)
print("Ranges are %s" % ranges)
return ranges

# def load_pipeline_states(file_prefix, all_uuid_list, continue_on_error, verbose):
Expand Down
102 changes: 99 additions & 3 deletions emission/purge_restore/purge_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,78 @@ def run_purge_data_pipeline(self, user_id, archive_dir, export_type):
current_start_ts = initStartTs if initStartTs is not None else entries_to_export[0]['data']['ts']
batch_start_ts = current_start_ts
current_batch_size = 0
batch_time_ranges = []
# batch_time_ranges = []
batch_time_range = ()
entries_to_process = []

while current_start_ts < initEndTs:
current_end_ts = min(current_start_ts + 3600, initEndTs) if export_type == 'incremental' else initEndTs

new_entries = self.get_export_timeseries_entries(user_id, ts, current_start_ts, current_end_ts)

if new_entries:
if current_batch_size + len(new_entries) > self.batch_size_limit:
# If adding new entries exceeds the batch size limit
entries_to_process = new_entries[:self.batch_size_limit - current_batch_size]
current_end_ts = entries_to_process[-1]['data']['ts']
else:
entries_to_process = new_entries

current_batch_size += len(entries_to_process)
self._last_processed_ts = entries_to_process[-1]['data']['ts']
print(f"Updated last_processed_ts {self._last_processed_ts}")

# batch_time_ranges.append((batch_start_ts, current_end_ts))
batch_time_range = (batch_start_ts, current_end_ts)

if current_batch_size >= self.batch_size_limit or current_end_ts >= initEndTs:
print("Exporting batch size of %s entries from %s to %s" % (current_batch_size, batch_start_ts, current_end_ts))
# file_names.extend(self.export_batches(user_id, ts, batch_time_ranges, archive_dir))
file_names.extend(self.export_batches(user_id, ts, batch_time_range, archive_dir))
# self.delete_batches(user_id, ts, batch_time_ranges)
# self.delete_batches(user_id, ts, batch_time_range)

ids_to_delete = [entry['_id'] for entry in entries_to_process]
self.delete_timeseries_entries(user_id, ts, ids_to_delete, batch_time_range[0], batch_time_range[1])

# batch_time_ranges = []
batch_time_range = ()

current_batch_size = 0
batch_start_ts = current_end_ts

current_start_ts = current_end_ts

else:
remaining_entries = self.get_export_timeseries_entries(user_id, ts, current_start_ts, initEndTs)
if not remaining_entries:
print("No new data to export, breaking out of while loop")
if current_batch_size > 0:
# batch_time_ranges.append((batch_start_ts, current_start_ts))
batch_time_range = (batch_start_ts, current_start_ts)
break
else:
print(f"No entries found in current time range from {current_start_ts} to {current_end_ts}")
print("Incrementing time range")
current_start_ts = current_end_ts

# Export any remaining batches
# if batch_time_ranges:
if batch_time_range:
print("Exporting accumulated entries of batch size of %s entries from %s to %s" % (current_batch_size, batch_start_ts, current_start_ts))
# file_names.extend(self.export_batches(user_id, ts, batch_time_ranges, archive_dir))
# self.delete_batches(user_id, ts, batch_time_ranges)
file_names.extend(self.export_batches(user_id, ts, batch_time_range, archive_dir))
# self.delete_batches(user_id, ts, batch_time_range)
ids_to_delete = [entry['_id'] for entry in entries_to_process]
self.delete_timeseries_entries(user_id, ts, ids_to_delete, batch_time_range[0], batch_time_range[1])

print(f"Exported data to {len(file_names)} files")
print(f"Exported file names: {file_names}")
return file_names


'''
while current_start_ts < initEndTs:
current_end_ts = min(current_start_ts + 3600, initEndTs) if export_type == 'incremental' else initEndTs
Expand Down Expand Up @@ -105,11 +175,26 @@ def run_purge_data_pipeline(self, user_id, archive_dir, export_type):
print("Exporting accumulated entries of batch size of %s entries from %s to %s" % (current_batch_size, batch_start_ts, current_start_ts))
file_names.extend(self.export_batches(user_id, ts, batch_time_ranges, archive_dir))
self.delete_batches(user_id, ts, batch_time_ranges)

print(f"Exported data to {len(file_names)} files")
print(f"Exported file names: {file_names}")
return file_names
'''

def export_batches(self, user_id, ts, batch_time_range, archive_dir):
file_names = []
(start_ts, end_ts) = batch_time_range
file_name = archive_dir + "/archive_%s_%s_%s" % (user_id, start_ts, end_ts)
print(f"Exporting entries from {start_ts} to {end_ts} to file: {file_name}")
epret.export(user_id, ts, start_ts, end_ts, file_name)
file_names.append(file_name)
return file_names

# def delete_batches(self, user_id, ts, batch_time_range):
# (start_ts, end_ts) = batch_time_range
# self.delete_timeseries_entries(user_id, ts, start_ts, end_ts)

'''
def export_batches(self, user_id, ts, batch_time_ranges, archive_dir):
file_names = []
for start_ts, end_ts in batch_time_ranges:
Expand All @@ -122,6 +207,7 @@ def export_batches(self, user_id, ts, batch_time_ranges, archive_dir):
def delete_batches(self, user_id, ts, batch_time_ranges):
for start_ts, end_ts in batch_time_ranges:
self.delete_timeseries_entries(user_id, ts, start_ts, end_ts)
'''

# def export_pipeline_states(self, user_id, file_name):
# pipeline_state_list = list(edb.get_pipeline_state_db().find({"user_id": user_id}))
Expand All @@ -133,6 +219,15 @@ def delete_batches(self, user_id, ts, batch_time_ranges):
# json.dump(pipeline_state_list,
# gpfd, default=esj.wrapped_default, allow_nan=False, indent=4)

def delete_timeseries_entries(self, user_id, ts, ids_to_delete, start_ts_datetime, end_ts_datetime):
logging.debug("Deleting entries from database...")
print("Deleting entries from database...")
print("Number of entries to delete: %s" % len(ids_to_delete))
result = ts.timeseries_db.delete_many({'_id': {'$in': ids_to_delete}})
logging.debug("{} deleted entries from {} to {}".format(result.deleted_count, start_ts_datetime, end_ts_datetime))
print("{} deleted entries from {} to {}".format(result.deleted_count, start_ts_datetime, end_ts_datetime))

'''
def delete_timeseries_entries(self, user_id, ts, start_ts_datetime, end_ts_datetime):
export_queries = self.get_export_queries(start_ts_datetime, end_ts_datetime)
for key, value in export_queries.items():
Expand All @@ -151,7 +246,8 @@ def delete_timeseries_entries(self, user_id, ts, start_ts_datetime, end_ts_datet
print(f"Key query: {key}")
logging.debug("{} deleted entries from {} to {}".format(result.deleted_count, start_ts_datetime, end_ts_datetime))
print("{} deleted entries from {} to {}".format(result.deleted_count, start_ts_datetime, end_ts_datetime))

'''

def get_export_timeseries_entries(self, user_id, ts, start_ts_datetime, end_ts_datetime):
entries_to_export = []
export_queries = self.get_export_queries(start_ts_datetime, end_ts_datetime)
Expand Down
2 changes: 1 addition & 1 deletion emission/purge_restore/restore_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ def run_restore_data_pipeline(self, user_id, file_names):
# self._last_processed_ts = pipelineState["last_processed_ts"]
# logging.debug("Restoring from file, last_processed_ts = %s" % (self._last_processed_ts))
# (tsdb_count, ucdb_count) = lmtfr.load_multi_timeline_for_range(file_prefix=file_name, continue_on_error=True)
(tsdb_count, ucdb_count) = eprit.load_multi_timeline_for_range(file_prefix=file_name, continue_on_error=True)
(tsdb_count, ucdb_count) = eprit.load_multi_timeline_for_range(file_prefix=file_name, continue_on_error=True, batch_size=500)
print("After load, tsdb_count = %s, ucdb_count = %s" % (tsdb_count, ucdb_count))
if tsdb_count == 0:
# Didn't process anything new so start at the same point next time
Expand Down
57 changes: 57 additions & 0 deletions emission/tests/exportTests/TestPurgeRestoreModule.py
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,63 @@ def testPurgeRestorePipelineIncremental(self):
self.assertEqual(res_stats_count, 2)
self.assertEqual(res, 1908)

def testPurgeRestorePipelineFullBatchExport(self):
pdp = eprpd.PurgeDataPipeline()
pdp.batch_size_limit = 500

'''
Test 1 - Verify that purging timeseries data works with sample real data
'''
# Check how much data there was before
res = edb.get_timeseries_db().count_documents({"user_id" : self.testUUID})
logging.info(f"About to purge {res} entries")
print(f"About to purge {res} entries")
self.assertEqual(res, 1906)

# Run the purge pipeline
file_names = pdp.run_purge_data_pipeline(self.testUUID, os.environ.get('DATA_DIR', 'emission/archived'), "full")
print("Exported file names: %s" % file_names)

'''
Test 2 - Assert the files exist after the export process
'''
# Sample data has 1906 raw entries, batch size limit is 500
# We should have (1906 / 500) + 1 = 3 + 1 = 4 files
# 1 extra file for the remaining 406 entries in the last batch)
self.assertEqual(len(file_names), 4)
for file_name in file_names:
self.assertTrue(pl.Path(file_name + ".gz").is_file())

'''
Test 3 - Verify that purging timeseries data works with sample real data
'''
# Check how much data there is after
entries = edb.get_timeseries_db().find({"user_id" : self.testUUID})
res = edb.get_timeseries_db().count_documents({"user_id" : self.testUUID})
logging.info(f"Purging complete: {res} entries remaining")
print(f"Purging complete: {res} entries remaining")
self.assertEqual(res, 0)

# Run the restore pipeline
logging.info(f"About to restore entries")
print(f"About to restore entries")
epr.run_restore_pipeline_for_user(self.testUUID, file_names)

'''
Test 4 - Verify that restoring timeseries data works with sample real data
'''
# Check how much data there is after
res = edb.get_timeseries_db().count_documents({"user_id" : self.testUUID})
res_stats_count = edb.get_timeseries_db().count_documents({"user_id" : self.testUUID, "metadata.key" : 'stats/pipeline_time'})
logging.info(f"Restoring complete: {res} entries restored")
print(f"Restoring complete: {res} entries restored")

# A single additional entry with key 'stats/pipeline_time' should be present as this test involves running only the restore pipeline
print(f"res_stats_count = {res_stats_count}")
self.assertEqual(res_stats_count, 1)
self.assertEqual(res, 1907)


def testPurgeRestorePipelineIncrementalBatchExport(self):
pdp = eprpd.PurgeDataPipeline()
pdp.batch_size_limit = 500
Expand Down

0 comments on commit 4ab627b

Please sign in to comment.