From 02fb2ce52ea8d8e88e1e2fa0490c1f5eb19a42ee Mon Sep 17 00:00:00 2001 From: "Mahadik, Mukul Chandrakant" Date: Sat, 31 Aug 2024 01:06:37 -0700 Subject: [PATCH] Removed batch size limit Realized that we might not need the batch size at all. The batch_size default in load_multi_timeline_for_range isn't a fixed cutoff that it'll only process the limited data. It just separates the data into batches in the script itself. ------ Will clean up code in next commit. --- emission/purge_restore/export_timeseries.py | 2 +- emission/purge_restore/import_timeseries.py | 2 - emission/purge_restore/purge_data.py | 215 ++++++------------ emission/purge_restore/restore_data.py | 2 +- .../exportTests/TestPurgeRestoreModule.py | 115 ---------- 5 files changed, 76 insertions(+), 260 deletions(-) diff --git a/emission/purge_restore/export_timeseries.py b/emission/purge_restore/export_timeseries.py index d7ebafd25..fc2bc596e 100644 --- a/emission/purge_restore/export_timeseries.py +++ b/emission/purge_restore/export_timeseries.py @@ -38,7 +38,7 @@ def get_with_retry(retrieve_call, in_query): else: query.startTs = curr_batch[-1][timeTypeSplit[0]][timeTypeSplit[1]] - return list_so_far[:501] + return list_so_far def get_from_all_three_sources_with_retry(user_id, in_query, databases=None): import emission.storage.timeseries.builtin_timeseries as estb diff --git a/emission/purge_restore/import_timeseries.py b/emission/purge_restore/import_timeseries.py index 22626e1b0..6870ca5a3 100644 --- a/emission/purge_restore/import_timeseries.py +++ b/emission/purge_restore/import_timeseries.py @@ -58,8 +58,6 @@ def get_load_ranges(entries, batch_size): start_indices = list(range(0, len(entries), batch_size)) ranges = list(zip(start_indices, start_indices[1:])) ranges.append((start_indices[-1], len(entries))) - print("Start indices are %s" % start_indices) - print("Ranges are %s" % ranges) return ranges # def load_pipeline_states(file_prefix, all_uuid_list, continue_on_error, verbose): diff --git a/emission/purge_restore/purge_data.py b/emission/purge_restore/purge_data.py index 51fb70acd..9e7e417c7 100644 --- a/emission/purge_restore/purge_data.py +++ b/emission/purge_restore/purge_data.py @@ -33,7 +33,6 @@ def purge_data(user_id, archive_dir, export_type): class PurgeDataPipeline: def __init__(self): self._last_processed_ts = None - self.batch_size_limit = 10000 @property def last_processed_ts(self): @@ -56,158 +55,102 @@ def run_purge_data_pipeline(self, user_id, archive_dir, export_type): file_names = [] entries_to_export = self.get_export_timeseries_entries(user_id, ts, time_query.startTs, time_query.endTs) - # count_entries = len(entries_to_export) + count_entries = len(entries_to_export) # If running the pipeline PURGE stage for first time, choose the first timestamp from the timeseries as the starting point # Otherwise cannot add 1 hour (3600 seconds) to a NoneType value if incremental option is selected current_start_ts = initStartTs if initStartTs is not None else entries_to_export[0]['data']['ts'] - batch_start_ts = current_start_ts - current_batch_size = 0 - # batch_time_ranges = [] - batch_time_range = () - entries_to_process = [] - while current_start_ts < initEndTs: + # while current_start_ts < initEndTs: + while True: + print("Inside while loop: current_start_ts = %s" % current_start_ts) current_end_ts = min(current_start_ts + 3600, initEndTs) if export_type == 'incremental' else initEndTs - new_entries = self.get_export_timeseries_entries(user_id, ts, current_start_ts, current_end_ts) - - if new_entries: - if current_batch_size + len(new_entries) > self.batch_size_limit: - # If adding new entries exceeds the batch size limit - entries_to_process = new_entries[:self.batch_size_limit - current_batch_size] - current_end_ts = entries_to_process[-1]['data']['ts'] - else: - entries_to_process = new_entries - - current_batch_size += len(entries_to_process) - self._last_processed_ts = entries_to_process[-1]['data']['ts'] - print(f"Updated last_processed_ts {self._last_processed_ts}") - - # batch_time_ranges.append((batch_start_ts, current_end_ts)) - batch_time_range = (batch_start_ts, current_end_ts) + if export_type == 'incremental': + current_end_ts = min(current_start_ts + 3600, initEndTs) + print("Inside export_type incremental, increasing current_end_ts: %s" % current_end_ts) + elif export_type == 'full': + current_end_ts = initEndTs + print("Inside export_type full, setting current_end_ts to current time: %s" % current_end_ts) + else: + raise ValueError("Unknown export_type %s" % export_type) + + print(f"Processing data from {current_start_ts} to {current_end_ts}") - if current_batch_size >= self.batch_size_limit or current_end_ts >= initEndTs: - print("Exporting batch size of %s entries from %s to %s" % (current_batch_size, batch_start_ts, current_end_ts)) - # file_names.extend(self.export_batches(user_id, ts, batch_time_ranges, archive_dir)) - file_names.extend(self.export_batches(user_id, ts, batch_time_range, archive_dir)) - # self.delete_batches(user_id, ts, batch_time_ranges) - # self.delete_batches(user_id, ts, batch_time_range) - - ids_to_delete = [entry['_id'] for entry in entries_to_process] - self.delete_timeseries_entries(user_id, ts, ids_to_delete, batch_time_range[0], batch_time_range[1]) - - # batch_time_ranges = [] - batch_time_range = () + file_name = archive_dir + "/archive_%s_%s_%s" % (user_id, current_start_ts, current_end_ts) + export_queries = epret.export(user_id, ts, current_start_ts, current_end_ts, file_name) + # epret.export(user_id, ts, current_start_ts, current_end_ts, file_name) - current_batch_size = 0 - batch_start_ts = current_end_ts + entries_to_export_1 = self.get_export_timeseries_entries(user_id, ts, time_query.startTs, time_query.endTs) + count_entries_1 = len(entries_to_export_1) + if export_queries is None and count_entries_1 > 0: + print("No entries found in current time range from %s to %s" % (current_start_ts, current_end_ts)) + print("Incrementing time range by 1 hour") current_start_ts = current_end_ts + continue + # if count_entries_2 == 0 and count_entries_1 == 0: + elif export_queries is None and count_entries_1 == 0: + # Didn't process anything new so start at the same point next time + # self._last_processed_ts = None + logging.debug("No new data to export, breaking out of while loop") + print("No new data to export, breaking out of while loop") + break + + entries_to_export_2 = self.get_export_timeseries_entries(user_id, ts, current_start_ts, current_end_ts) + count_entries_2 = len(entries_to_export_2) + print("count_entries_2 = %s" % count_entries_2) - else: - remaining_entries = self.get_export_timeseries_entries(user_id, ts, current_start_ts, initEndTs) - if not remaining_entries: - print("No new data to export, breaking out of while loop") - if current_batch_size > 0: - # batch_time_ranges.append((batch_start_ts, current_start_ts)) - batch_time_range = (batch_start_ts, current_start_ts) - break - else: - print(f"No entries found in current time range from {current_start_ts} to {current_end_ts}") - print("Incrementing time range") - current_start_ts = current_end_ts - - # Export any remaining batches - # if batch_time_ranges: - if batch_time_range: - print("Exporting accumulated entries of batch size of %s entries from %s to %s" % (current_batch_size, batch_start_ts, current_start_ts)) - # file_names.extend(self.export_batches(user_id, ts, batch_time_ranges, archive_dir)) - # self.delete_batches(user_id, ts, batch_time_ranges) - file_names.extend(self.export_batches(user_id, ts, batch_time_range, archive_dir)) - # self.delete_batches(user_id, ts, batch_time_range) - ids_to_delete = [entry['_id'] for entry in entries_to_process] - self.delete_timeseries_entries(user_id, ts, ids_to_delete, batch_time_range[0], batch_time_range[1]) + + logging.debug("Exporting to file: %s" % file_name) + print("Exporting to file: %s" % file_name) + file_names.append(file_name) + print("File names: %s" % file_names) - print(f"Exported data to {len(file_names)} files") - print(f"Exported file names: {file_names}") - return file_names + self.delete_timeseries_entries(user_id, ts, current_start_ts, current_end_ts) + print("Total entries to export: %s" % count_entries) + print("Entries exported in timerange %s to %s: %s" % (current_start_ts, current_end_ts, count_entries_2)) + print("New count entries to export: %s" % count_entries_1) + self._last_processed_ts = entries_to_export_2[-1]['data']['ts'] + print("Updated last_processed_ts %s" % self._last_processed_ts) - ''' - while current_start_ts < initEndTs: - current_end_ts = min(current_start_ts + 3600, initEndTs) if export_type == 'incremental' else initEndTs - - new_entries = self.get_export_timeseries_entries(user_id, ts, current_start_ts, current_end_ts) - - if new_entries: - current_batch_size += len(new_entries) - logging.debug("Current batch size = %s" % current_batch_size) - print("Current batch size = %s" % current_batch_size) - self._last_processed_ts = new_entries[-1]['data']['ts'] - logging.debug(f"Updated last_processed_ts {self._last_processed_ts}") - print(f"Updated last_processed_ts {self._last_processed_ts}") + current_start_ts = current_end_ts + if current_start_ts >= initEndTs: + break - if current_batch_size >= self.batch_size_limit or current_end_ts >= initEndTs: - batch_time_ranges.append((batch_start_ts, current_end_ts)) - print("Exporting batch size of %s entries from %s to %s" % (current_batch_size, batch_start_ts, current_end_ts)) - file_names.extend(self.export_batches(user_id, ts, batch_time_ranges, archive_dir)) - self.delete_batches(user_id, ts, batch_time_ranges) - - batch_time_ranges = [] - current_batch_size = 0 - batch_start_ts = current_end_ts + print("Exported data to %s files" % len(file_names)) + print("Exported file names: %s" % file_names) + return file_names - current_start_ts = current_end_ts - else: - remaining_entries = self.get_export_timeseries_entries(user_id, ts, current_start_ts, initEndTs) - if not remaining_entries: - print("No new data to export, breaking out of while loop") - if current_batch_size > 0: - batch_time_ranges.append((batch_start_ts, current_start_ts)) - break - else: - print(f"No entries found in current time range from {current_start_ts} to {current_end_ts}") - print("Incrementing time range") - current_start_ts = current_end_ts + # new_entries = self.get_export_timeseries_entries(user_id, ts, current_start_ts, current_end_ts) - if batch_time_ranges: - print("Exporting accumulated entries of batch size of %s entries from %s to %s" % (current_batch_size, batch_start_ts, current_start_ts)) - file_names.extend(self.export_batches(user_id, ts, batch_time_ranges, archive_dir)) - self.delete_batches(user_id, ts, batch_time_ranges) - - print(f"Exported data to {len(file_names)} files") - print(f"Exported file names: {file_names}") - return file_names - ''' + # if new_entries: + # self._last_processed_ts = new_entries[-1]['data']['ts'] + # print(f"Updated last_processed_ts {self._last_processed_ts}") - def export_batches(self, user_id, ts, batch_time_range, archive_dir): - file_names = [] - (start_ts, end_ts) = batch_time_range - file_name = archive_dir + "/archive_%s_%s_%s" % (user_id, start_ts, end_ts) - print(f"Exporting entries from {start_ts} to {end_ts} to file: {file_name}") - epret.export(user_id, ts, start_ts, end_ts, file_name) - file_names.append(file_name) - return file_names + # if current_end_ts >= initEndTs: + # file_name = archive_dir + "/archive_%s_%s_%s" % (user_id, current_start_ts, current_end_ts) + # print(f"Exporting entries from {current_start_ts} to {current_end_ts} to file: {file_name}") + # epret.export(user_id, ts, current_start_ts, current_end_ts, file_name) + # file_names.append(file_name) + # self.delete_timeseries_entries(user_id, ts, current_start_ts, current_end_ts) - # def delete_batches(self, user_id, ts, batch_time_range): - # (start_ts, end_ts) = batch_time_range - # self.delete_timeseries_entries(user_id, ts, start_ts, end_ts) + # current_start_ts = current_end_ts - ''' - def export_batches(self, user_id, ts, batch_time_ranges, archive_dir): - file_names = [] - for start_ts, end_ts in batch_time_ranges: - file_name = archive_dir + "/archive_%s_%s_%s" % (user_id, start_ts, end_ts) - print(f"Exporting entries from {start_ts} to {end_ts} to file: {file_name}") - epret.export(user_id, ts, start_ts, end_ts, file_name) - file_names.append(file_name) - return file_names + # else: + # remaining_entries = self.get_export_timeseries_entries(user_id, ts, initStartTs, initEndTs) + # if not remaining_entries: + # print("No new data to export, breaking out of while loop") + # break + # else: + # print(f"No entries found in current time range from {current_start_ts} to {current_end_ts}") + # print("Incrementing time range") + # current_start_ts = current_end_ts - def delete_batches(self, user_id, ts, batch_time_ranges): - for start_ts, end_ts in batch_time_ranges: - self.delete_timeseries_entries(user_id, ts, start_ts, end_ts) - ''' + # print(f"Exported data to {len(file_names)} files") + # print(f"Exported file names: {file_names}") + # return file_names # def export_pipeline_states(self, user_id, file_name): # pipeline_state_list = list(edb.get_pipeline_state_db().find({"user_id": user_id})) @@ -219,15 +162,6 @@ def delete_batches(self, user_id, ts, batch_time_ranges): # json.dump(pipeline_state_list, # gpfd, default=esj.wrapped_default, allow_nan=False, indent=4) - def delete_timeseries_entries(self, user_id, ts, ids_to_delete, start_ts_datetime, end_ts_datetime): - logging.debug("Deleting entries from database...") - print("Deleting entries from database...") - print("Number of entries to delete: %s" % len(ids_to_delete)) - result = ts.timeseries_db.delete_many({'_id': {'$in': ids_to_delete}}) - logging.debug("{} deleted entries from {} to {}".format(result.deleted_count, start_ts_datetime, end_ts_datetime)) - print("{} deleted entries from {} to {}".format(result.deleted_count, start_ts_datetime, end_ts_datetime)) - - ''' def delete_timeseries_entries(self, user_id, ts, start_ts_datetime, end_ts_datetime): export_queries = self.get_export_queries(start_ts_datetime, end_ts_datetime) for key, value in export_queries.items(): @@ -246,7 +180,6 @@ def delete_timeseries_entries(self, user_id, ts, start_ts_datetime, end_ts_datet print(f"Key query: {key}") logging.debug("{} deleted entries from {} to {}".format(result.deleted_count, start_ts_datetime, end_ts_datetime)) print("{} deleted entries from {} to {}".format(result.deleted_count, start_ts_datetime, end_ts_datetime)) - ''' def get_export_timeseries_entries(self, user_id, ts, start_ts_datetime, end_ts_datetime): entries_to_export = [] diff --git a/emission/purge_restore/restore_data.py b/emission/purge_restore/restore_data.py index e541b4790..2758336a9 100644 --- a/emission/purge_restore/restore_data.py +++ b/emission/purge_restore/restore_data.py @@ -43,7 +43,7 @@ def run_restore_data_pipeline(self, user_id, file_names): # self._last_processed_ts = pipelineState["last_processed_ts"] # logging.debug("Restoring from file, last_processed_ts = %s" % (self._last_processed_ts)) # (tsdb_count, ucdb_count) = lmtfr.load_multi_timeline_for_range(file_prefix=file_name, continue_on_error=True) - (tsdb_count, ucdb_count) = eprit.load_multi_timeline_for_range(file_prefix=file_name, continue_on_error=True, batch_size=500) + (tsdb_count, ucdb_count) = eprit.load_multi_timeline_for_range(file_prefix=file_name, continue_on_error=True) print("After load, tsdb_count = %s, ucdb_count = %s" % (tsdb_count, ucdb_count)) if tsdb_count == 0: # Didn't process anything new so start at the same point next time diff --git a/emission/tests/exportTests/TestPurgeRestoreModule.py b/emission/tests/exportTests/TestPurgeRestoreModule.py index 16825e4d2..c18e860be 100644 --- a/emission/tests/exportTests/TestPurgeRestoreModule.py +++ b/emission/tests/exportTests/TestPurgeRestoreModule.py @@ -160,7 +160,6 @@ def testPurgeRestorePipelineFull(self): print(f"Restoring complete: {res} entries restored") # Two additional entries with key 'stats/pipeline_time' should be present - one from the purge pipeline, other from the restore pipeline - print(f"res_stats_count = {res_stats_count}") self.assertEqual(res_stats_count, 2) self.assertEqual(res, 1908) @@ -217,123 +216,9 @@ def testPurgeRestorePipelineIncremental(self): print(f"Restoring complete: {res} entries restored") # Two additional entries with key 'stats/pipeline_time' should be present - one from the purge pipeline, other from the restore pipeline - print(f"res_stats_count = {res_stats_count}") self.assertEqual(res_stats_count, 2) self.assertEqual(res, 1908) - def testPurgeRestorePipelineFullBatchExport(self): - pdp = eprpd.PurgeDataPipeline() - pdp.batch_size_limit = 500 - - ''' - Test 1 - Verify that purging timeseries data works with sample real data - ''' - # Check how much data there was before - res = edb.get_timeseries_db().count_documents({"user_id" : self.testUUID}) - logging.info(f"About to purge {res} entries") - print(f"About to purge {res} entries") - self.assertEqual(res, 1906) - - # Run the purge pipeline - file_names = pdp.run_purge_data_pipeline(self.testUUID, os.environ.get('DATA_DIR', 'emission/archived'), "full") - print("Exported file names: %s" % file_names) - - ''' - Test 2 - Assert the files exist after the export process - ''' - # Sample data has 1906 raw entries, batch size limit is 500 - # We should have (1906 / 500) + 1 = 3 + 1 = 4 files - # 1 extra file for the remaining 406 entries in the last batch) - self.assertEqual(len(file_names), 4) - for file_name in file_names: - self.assertTrue(pl.Path(file_name + ".gz").is_file()) - - ''' - Test 3 - Verify that purging timeseries data works with sample real data - ''' - # Check how much data there is after - entries = edb.get_timeseries_db().find({"user_id" : self.testUUID}) - res = edb.get_timeseries_db().count_documents({"user_id" : self.testUUID}) - logging.info(f"Purging complete: {res} entries remaining") - print(f"Purging complete: {res} entries remaining") - self.assertEqual(res, 0) - - # Run the restore pipeline - logging.info(f"About to restore entries") - print(f"About to restore entries") - epr.run_restore_pipeline_for_user(self.testUUID, file_names) - - ''' - Test 4 - Verify that restoring timeseries data works with sample real data - ''' - # Check how much data there is after - res = edb.get_timeseries_db().count_documents({"user_id" : self.testUUID}) - res_stats_count = edb.get_timeseries_db().count_documents({"user_id" : self.testUUID, "metadata.key" : 'stats/pipeline_time'}) - logging.info(f"Restoring complete: {res} entries restored") - print(f"Restoring complete: {res} entries restored") - - # A single additional entry with key 'stats/pipeline_time' should be present as this test involves running only the restore pipeline - print(f"res_stats_count = {res_stats_count}") - self.assertEqual(res_stats_count, 1) - self.assertEqual(res, 1907) - - - def testPurgeRestorePipelineIncrementalBatchExport(self): - pdp = eprpd.PurgeDataPipeline() - pdp.batch_size_limit = 500 - - ''' - Test 1 - Verify that purging timeseries data works with sample real data - ''' - # Check how much data there was before - res = edb.get_timeseries_db().count_documents({"user_id" : self.testUUID}) - logging.info(f"About to purge {res} entries") - print(f"About to purge {res} entries") - self.assertEqual(res, 1906) - - # Run the purge pipeline - file_names = pdp.run_purge_data_pipeline(self.testUUID, os.environ.get('DATA_DIR', 'emission/archived'), "incremental") - print("Exported file names: %s" % file_names) - - ''' - Test 2 - Assert the files exist after the export process - ''' - # Sample data has 1906 raw entries, batch size limit is 500 - # We should have (1906 / 500) + 1 = 3 + 1 = 4 files - # 1 extra file for the remaining 406 entries in the last batch) - self.assertEqual(len(file_names), 4) - for file_name in file_names: - self.assertTrue(pl.Path(file_name + ".gz").is_file()) - - ''' - Test 3 - Verify that purging timeseries data works with sample real data - ''' - # Check how much data there is after - entries = edb.get_timeseries_db().find({"user_id" : self.testUUID}) - res = edb.get_timeseries_db().count_documents({"user_id" : self.testUUID}) - logging.info(f"Purging complete: {res} entries remaining") - print(f"Purging complete: {res} entries remaining") - self.assertEqual(res, 0) - - # Run the restore pipeline - logging.info(f"About to restore entries") - print(f"About to restore entries") - epr.run_restore_pipeline_for_user(self.testUUID, file_names) - - ''' - Test 4 - Verify that restoring timeseries data works with sample real data - ''' - # Check how much data there is after - res = edb.get_timeseries_db().count_documents({"user_id" : self.testUUID}) - res_stats_count = edb.get_timeseries_db().count_documents({"user_id" : self.testUUID, "metadata.key" : 'stats/pipeline_time'}) - logging.info(f"Restoring complete: {res} entries restored") - print(f"Restoring complete: {res} entries restored") - - # A single additional entry with key 'stats/pipeline_time' should be present as this test involves running only the restore pipeline - print(f"res_stats_count = {res_stats_count}") - self.assertEqual(res_stats_count, 1) - self.assertEqual(res, 1907) - if __name__ == '__main__': etc.configLogging() unittest.main()