diff --git a/emission/purge_restore/export_timeseries.py b/emission/purge_restore/export_timeseries.py index 69e983ead..d7ebafd25 100644 --- a/emission/purge_restore/export_timeseries.py +++ b/emission/purge_restore/export_timeseries.py @@ -37,7 +37,8 @@ def get_with_retry(retrieve_call, in_query): done = True else: query.startTs = curr_batch[-1][timeTypeSplit[0]][timeTypeSplit[1]] - return list_so_far + + return list_so_far[:501] def get_from_all_three_sources_with_retry(user_id, in_query, databases=None): import emission.storage.timeseries.builtin_timeseries as estb diff --git a/emission/purge_restore/import_timeseries.py b/emission/purge_restore/import_timeseries.py index 6870ca5a3..22626e1b0 100644 --- a/emission/purge_restore/import_timeseries.py +++ b/emission/purge_restore/import_timeseries.py @@ -58,6 +58,8 @@ def get_load_ranges(entries, batch_size): start_indices = list(range(0, len(entries), batch_size)) ranges = list(zip(start_indices, start_indices[1:])) ranges.append((start_indices[-1], len(entries))) + print("Start indices are %s" % start_indices) + print("Ranges are %s" % ranges) return ranges # def load_pipeline_states(file_prefix, all_uuid_list, continue_on_error, verbose): diff --git a/emission/purge_restore/purge_data.py b/emission/purge_restore/purge_data.py index cde3c1981..51fb70acd 100644 --- a/emission/purge_restore/purge_data.py +++ b/emission/purge_restore/purge_data.py @@ -63,8 +63,78 @@ def run_purge_data_pipeline(self, user_id, archive_dir, export_type): current_start_ts = initStartTs if initStartTs is not None else entries_to_export[0]['data']['ts'] batch_start_ts = current_start_ts current_batch_size = 0 - batch_time_ranges = [] + # batch_time_ranges = [] + batch_time_range = () + entries_to_process = [] + while current_start_ts < initEndTs: + current_end_ts = min(current_start_ts + 3600, initEndTs) if export_type == 'incremental' else initEndTs + + new_entries = self.get_export_timeseries_entries(user_id, ts, current_start_ts, current_end_ts) + + if new_entries: + if current_batch_size + len(new_entries) > self.batch_size_limit: + # If adding new entries exceeds the batch size limit + entries_to_process = new_entries[:self.batch_size_limit - current_batch_size] + current_end_ts = entries_to_process[-1]['data']['ts'] + else: + entries_to_process = new_entries + + current_batch_size += len(entries_to_process) + self._last_processed_ts = entries_to_process[-1]['data']['ts'] + print(f"Updated last_processed_ts {self._last_processed_ts}") + + # batch_time_ranges.append((batch_start_ts, current_end_ts)) + batch_time_range = (batch_start_ts, current_end_ts) + + if current_batch_size >= self.batch_size_limit or current_end_ts >= initEndTs: + print("Exporting batch size of %s entries from %s to %s" % (current_batch_size, batch_start_ts, current_end_ts)) + # file_names.extend(self.export_batches(user_id, ts, batch_time_ranges, archive_dir)) + file_names.extend(self.export_batches(user_id, ts, batch_time_range, archive_dir)) + # self.delete_batches(user_id, ts, batch_time_ranges) + # self.delete_batches(user_id, ts, batch_time_range) + + ids_to_delete = [entry['_id'] for entry in entries_to_process] + self.delete_timeseries_entries(user_id, ts, ids_to_delete, batch_time_range[0], batch_time_range[1]) + + # batch_time_ranges = [] + batch_time_range = () + + current_batch_size = 0 + batch_start_ts = current_end_ts + + current_start_ts = current_end_ts + + else: + remaining_entries = self.get_export_timeseries_entries(user_id, ts, current_start_ts, initEndTs) + if not remaining_entries: + print("No new data to export, breaking out of while loop") + if current_batch_size > 0: + # batch_time_ranges.append((batch_start_ts, current_start_ts)) + batch_time_range = (batch_start_ts, current_start_ts) + break + else: + print(f"No entries found in current time range from {current_start_ts} to {current_end_ts}") + print("Incrementing time range") + current_start_ts = current_end_ts + + # Export any remaining batches + # if batch_time_ranges: + if batch_time_range: + print("Exporting accumulated entries of batch size of %s entries from %s to %s" % (current_batch_size, batch_start_ts, current_start_ts)) + # file_names.extend(self.export_batches(user_id, ts, batch_time_ranges, archive_dir)) + # self.delete_batches(user_id, ts, batch_time_ranges) + file_names.extend(self.export_batches(user_id, ts, batch_time_range, archive_dir)) + # self.delete_batches(user_id, ts, batch_time_range) + ids_to_delete = [entry['_id'] for entry in entries_to_process] + self.delete_timeseries_entries(user_id, ts, ids_to_delete, batch_time_range[0], batch_time_range[1]) + + print(f"Exported data to {len(file_names)} files") + print(f"Exported file names: {file_names}") + return file_names + + + ''' while current_start_ts < initEndTs: current_end_ts = min(current_start_ts + 3600, initEndTs) if export_type == 'incremental' else initEndTs @@ -105,11 +175,26 @@ def run_purge_data_pipeline(self, user_id, archive_dir, export_type): print("Exporting accumulated entries of batch size of %s entries from %s to %s" % (current_batch_size, batch_start_ts, current_start_ts)) file_names.extend(self.export_batches(user_id, ts, batch_time_ranges, archive_dir)) self.delete_batches(user_id, ts, batch_time_ranges) - + print(f"Exported data to {len(file_names)} files") print(f"Exported file names: {file_names}") return file_names + ''' + def export_batches(self, user_id, ts, batch_time_range, archive_dir): + file_names = [] + (start_ts, end_ts) = batch_time_range + file_name = archive_dir + "/archive_%s_%s_%s" % (user_id, start_ts, end_ts) + print(f"Exporting entries from {start_ts} to {end_ts} to file: {file_name}") + epret.export(user_id, ts, start_ts, end_ts, file_name) + file_names.append(file_name) + return file_names + + # def delete_batches(self, user_id, ts, batch_time_range): + # (start_ts, end_ts) = batch_time_range + # self.delete_timeseries_entries(user_id, ts, start_ts, end_ts) + + ''' def export_batches(self, user_id, ts, batch_time_ranges, archive_dir): file_names = [] for start_ts, end_ts in batch_time_ranges: @@ -122,6 +207,7 @@ def export_batches(self, user_id, ts, batch_time_ranges, archive_dir): def delete_batches(self, user_id, ts, batch_time_ranges): for start_ts, end_ts in batch_time_ranges: self.delete_timeseries_entries(user_id, ts, start_ts, end_ts) + ''' # def export_pipeline_states(self, user_id, file_name): # pipeline_state_list = list(edb.get_pipeline_state_db().find({"user_id": user_id})) @@ -133,6 +219,15 @@ def delete_batches(self, user_id, ts, batch_time_ranges): # json.dump(pipeline_state_list, # gpfd, default=esj.wrapped_default, allow_nan=False, indent=4) + def delete_timeseries_entries(self, user_id, ts, ids_to_delete, start_ts_datetime, end_ts_datetime): + logging.debug("Deleting entries from database...") + print("Deleting entries from database...") + print("Number of entries to delete: %s" % len(ids_to_delete)) + result = ts.timeseries_db.delete_many({'_id': {'$in': ids_to_delete}}) + logging.debug("{} deleted entries from {} to {}".format(result.deleted_count, start_ts_datetime, end_ts_datetime)) + print("{} deleted entries from {} to {}".format(result.deleted_count, start_ts_datetime, end_ts_datetime)) + + ''' def delete_timeseries_entries(self, user_id, ts, start_ts_datetime, end_ts_datetime): export_queries = self.get_export_queries(start_ts_datetime, end_ts_datetime) for key, value in export_queries.items(): @@ -151,7 +246,8 @@ def delete_timeseries_entries(self, user_id, ts, start_ts_datetime, end_ts_datet print(f"Key query: {key}") logging.debug("{} deleted entries from {} to {}".format(result.deleted_count, start_ts_datetime, end_ts_datetime)) print("{} deleted entries from {} to {}".format(result.deleted_count, start_ts_datetime, end_ts_datetime)) - + ''' + def get_export_timeseries_entries(self, user_id, ts, start_ts_datetime, end_ts_datetime): entries_to_export = [] export_queries = self.get_export_queries(start_ts_datetime, end_ts_datetime) diff --git a/emission/purge_restore/restore_data.py b/emission/purge_restore/restore_data.py index 2758336a9..e541b4790 100644 --- a/emission/purge_restore/restore_data.py +++ b/emission/purge_restore/restore_data.py @@ -43,7 +43,7 @@ def run_restore_data_pipeline(self, user_id, file_names): # self._last_processed_ts = pipelineState["last_processed_ts"] # logging.debug("Restoring from file, last_processed_ts = %s" % (self._last_processed_ts)) # (tsdb_count, ucdb_count) = lmtfr.load_multi_timeline_for_range(file_prefix=file_name, continue_on_error=True) - (tsdb_count, ucdb_count) = eprit.load_multi_timeline_for_range(file_prefix=file_name, continue_on_error=True) + (tsdb_count, ucdb_count) = eprit.load_multi_timeline_for_range(file_prefix=file_name, continue_on_error=True, batch_size=500) print("After load, tsdb_count = %s, ucdb_count = %s" % (tsdb_count, ucdb_count)) if tsdb_count == 0: # Didn't process anything new so start at the same point next time diff --git a/emission/tests/exportTests/TestPurgeRestoreModule.py b/emission/tests/exportTests/TestPurgeRestoreModule.py index 85a1521ee..16825e4d2 100644 --- a/emission/tests/exportTests/TestPurgeRestoreModule.py +++ b/emission/tests/exportTests/TestPurgeRestoreModule.py @@ -221,6 +221,63 @@ def testPurgeRestorePipelineIncremental(self): self.assertEqual(res_stats_count, 2) self.assertEqual(res, 1908) + def testPurgeRestorePipelineFullBatchExport(self): + pdp = eprpd.PurgeDataPipeline() + pdp.batch_size_limit = 500 + + ''' + Test 1 - Verify that purging timeseries data works with sample real data + ''' + # Check how much data there was before + res = edb.get_timeseries_db().count_documents({"user_id" : self.testUUID}) + logging.info(f"About to purge {res} entries") + print(f"About to purge {res} entries") + self.assertEqual(res, 1906) + + # Run the purge pipeline + file_names = pdp.run_purge_data_pipeline(self.testUUID, os.environ.get('DATA_DIR', 'emission/archived'), "full") + print("Exported file names: %s" % file_names) + + ''' + Test 2 - Assert the files exist after the export process + ''' + # Sample data has 1906 raw entries, batch size limit is 500 + # We should have (1906 / 500) + 1 = 3 + 1 = 4 files + # 1 extra file for the remaining 406 entries in the last batch) + self.assertEqual(len(file_names), 4) + for file_name in file_names: + self.assertTrue(pl.Path(file_name + ".gz").is_file()) + + ''' + Test 3 - Verify that purging timeseries data works with sample real data + ''' + # Check how much data there is after + entries = edb.get_timeseries_db().find({"user_id" : self.testUUID}) + res = edb.get_timeseries_db().count_documents({"user_id" : self.testUUID}) + logging.info(f"Purging complete: {res} entries remaining") + print(f"Purging complete: {res} entries remaining") + self.assertEqual(res, 0) + + # Run the restore pipeline + logging.info(f"About to restore entries") + print(f"About to restore entries") + epr.run_restore_pipeline_for_user(self.testUUID, file_names) + + ''' + Test 4 - Verify that restoring timeseries data works with sample real data + ''' + # Check how much data there is after + res = edb.get_timeseries_db().count_documents({"user_id" : self.testUUID}) + res_stats_count = edb.get_timeseries_db().count_documents({"user_id" : self.testUUID, "metadata.key" : 'stats/pipeline_time'}) + logging.info(f"Restoring complete: {res} entries restored") + print(f"Restoring complete: {res} entries restored") + + # A single additional entry with key 'stats/pipeline_time' should be present as this test involves running only the restore pipeline + print(f"res_stats_count = {res_stats_count}") + self.assertEqual(res_stats_count, 1) + self.assertEqual(res, 1907) + + def testPurgeRestorePipelineIncrementalBatchExport(self): pdp = eprpd.PurgeDataPipeline() pdp.batch_size_limit = 500