Skip to content

Commit

Permalink
Removed batch size limit
Browse files Browse the repository at this point in the history
Realized that we might not need the batch size at all.
The batch_size default in load_multi_timeline_for_range isn't a fixed cutoff that it'll only process the limited data. It just separates the data into batches in the script itself.

------

Will clean up code in next commit.
  • Loading branch information
Mahadik, Mukul Chandrakant authored and Mahadik, Mukul Chandrakant committed Aug 31, 2024
1 parent 4ab627b commit 02fb2ce
Show file tree
Hide file tree
Showing 5 changed files with 76 additions and 260 deletions.
2 changes: 1 addition & 1 deletion emission/purge_restore/export_timeseries.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ def get_with_retry(retrieve_call, in_query):
else:
query.startTs = curr_batch[-1][timeTypeSplit[0]][timeTypeSplit[1]]

return list_so_far[:501]
return list_so_far

def get_from_all_three_sources_with_retry(user_id, in_query, databases=None):
import emission.storage.timeseries.builtin_timeseries as estb
Expand Down
2 changes: 0 additions & 2 deletions emission/purge_restore/import_timeseries.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,6 @@ def get_load_ranges(entries, batch_size):
start_indices = list(range(0, len(entries), batch_size))
ranges = list(zip(start_indices, start_indices[1:]))
ranges.append((start_indices[-1], len(entries)))
print("Start indices are %s" % start_indices)
print("Ranges are %s" % ranges)
return ranges

# def load_pipeline_states(file_prefix, all_uuid_list, continue_on_error, verbose):
Expand Down
215 changes: 74 additions & 141 deletions emission/purge_restore/purge_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ def purge_data(user_id, archive_dir, export_type):
class PurgeDataPipeline:
def __init__(self):
self._last_processed_ts = None
self.batch_size_limit = 10000

@property
def last_processed_ts(self):
Expand All @@ -56,158 +55,102 @@ def run_purge_data_pipeline(self, user_id, archive_dir, export_type):

file_names = []
entries_to_export = self.get_export_timeseries_entries(user_id, ts, time_query.startTs, time_query.endTs)
# count_entries = len(entries_to_export)
count_entries = len(entries_to_export)

# If running the pipeline PURGE stage for first time, choose the first timestamp from the timeseries as the starting point
# Otherwise cannot add 1 hour (3600 seconds) to a NoneType value if incremental option is selected
current_start_ts = initStartTs if initStartTs is not None else entries_to_export[0]['data']['ts']
batch_start_ts = current_start_ts
current_batch_size = 0
# batch_time_ranges = []
batch_time_range = ()
entries_to_process = []

while current_start_ts < initEndTs:
# while current_start_ts < initEndTs:
while True:
print("Inside while loop: current_start_ts = %s" % current_start_ts)
current_end_ts = min(current_start_ts + 3600, initEndTs) if export_type == 'incremental' else initEndTs

new_entries = self.get_export_timeseries_entries(user_id, ts, current_start_ts, current_end_ts)

if new_entries:
if current_batch_size + len(new_entries) > self.batch_size_limit:
# If adding new entries exceeds the batch size limit
entries_to_process = new_entries[:self.batch_size_limit - current_batch_size]
current_end_ts = entries_to_process[-1]['data']['ts']
else:
entries_to_process = new_entries

current_batch_size += len(entries_to_process)
self._last_processed_ts = entries_to_process[-1]['data']['ts']
print(f"Updated last_processed_ts {self._last_processed_ts}")

# batch_time_ranges.append((batch_start_ts, current_end_ts))
batch_time_range = (batch_start_ts, current_end_ts)
if export_type == 'incremental':
current_end_ts = min(current_start_ts + 3600, initEndTs)
print("Inside export_type incremental, increasing current_end_ts: %s" % current_end_ts)
elif export_type == 'full':
current_end_ts = initEndTs
print("Inside export_type full, setting current_end_ts to current time: %s" % current_end_ts)
else:
raise ValueError("Unknown export_type %s" % export_type)

print(f"Processing data from {current_start_ts} to {current_end_ts}")

if current_batch_size >= self.batch_size_limit or current_end_ts >= initEndTs:
print("Exporting batch size of %s entries from %s to %s" % (current_batch_size, batch_start_ts, current_end_ts))
# file_names.extend(self.export_batches(user_id, ts, batch_time_ranges, archive_dir))
file_names.extend(self.export_batches(user_id, ts, batch_time_range, archive_dir))
# self.delete_batches(user_id, ts, batch_time_ranges)
# self.delete_batches(user_id, ts, batch_time_range)

ids_to_delete = [entry['_id'] for entry in entries_to_process]
self.delete_timeseries_entries(user_id, ts, ids_to_delete, batch_time_range[0], batch_time_range[1])

# batch_time_ranges = []
batch_time_range = ()
file_name = archive_dir + "/archive_%s_%s_%s" % (user_id, current_start_ts, current_end_ts)
export_queries = epret.export(user_id, ts, current_start_ts, current_end_ts, file_name)
# epret.export(user_id, ts, current_start_ts, current_end_ts, file_name)

current_batch_size = 0
batch_start_ts = current_end_ts
entries_to_export_1 = self.get_export_timeseries_entries(user_id, ts, time_query.startTs, time_query.endTs)
count_entries_1 = len(entries_to_export_1)

if export_queries is None and count_entries_1 > 0:
print("No entries found in current time range from %s to %s" % (current_start_ts, current_end_ts))
print("Incrementing time range by 1 hour")
current_start_ts = current_end_ts
continue
# if count_entries_2 == 0 and count_entries_1 == 0:
elif export_queries is None and count_entries_1 == 0:
# Didn't process anything new so start at the same point next time
# self._last_processed_ts = None
logging.debug("No new data to export, breaking out of while loop")
print("No new data to export, breaking out of while loop")
break

entries_to_export_2 = self.get_export_timeseries_entries(user_id, ts, current_start_ts, current_end_ts)
count_entries_2 = len(entries_to_export_2)
print("count_entries_2 = %s" % count_entries_2)

else:
remaining_entries = self.get_export_timeseries_entries(user_id, ts, current_start_ts, initEndTs)
if not remaining_entries:
print("No new data to export, breaking out of while loop")
if current_batch_size > 0:
# batch_time_ranges.append((batch_start_ts, current_start_ts))
batch_time_range = (batch_start_ts, current_start_ts)
break
else:
print(f"No entries found in current time range from {current_start_ts} to {current_end_ts}")
print("Incrementing time range")
current_start_ts = current_end_ts

# Export any remaining batches
# if batch_time_ranges:
if batch_time_range:
print("Exporting accumulated entries of batch size of %s entries from %s to %s" % (current_batch_size, batch_start_ts, current_start_ts))
# file_names.extend(self.export_batches(user_id, ts, batch_time_ranges, archive_dir))
# self.delete_batches(user_id, ts, batch_time_ranges)
file_names.extend(self.export_batches(user_id, ts, batch_time_range, archive_dir))
# self.delete_batches(user_id, ts, batch_time_range)
ids_to_delete = [entry['_id'] for entry in entries_to_process]
self.delete_timeseries_entries(user_id, ts, ids_to_delete, batch_time_range[0], batch_time_range[1])

logging.debug("Exporting to file: %s" % file_name)
print("Exporting to file: %s" % file_name)
file_names.append(file_name)
print("File names: %s" % file_names)

print(f"Exported data to {len(file_names)} files")
print(f"Exported file names: {file_names}")
return file_names
self.delete_timeseries_entries(user_id, ts, current_start_ts, current_end_ts)

print("Total entries to export: %s" % count_entries)
print("Entries exported in timerange %s to %s: %s" % (current_start_ts, current_end_ts, count_entries_2))
print("New count entries to export: %s" % count_entries_1)
self._last_processed_ts = entries_to_export_2[-1]['data']['ts']
print("Updated last_processed_ts %s" % self._last_processed_ts)

'''
while current_start_ts < initEndTs:
current_end_ts = min(current_start_ts + 3600, initEndTs) if export_type == 'incremental' else initEndTs
new_entries = self.get_export_timeseries_entries(user_id, ts, current_start_ts, current_end_ts)
if new_entries:
current_batch_size += len(new_entries)
logging.debug("Current batch size = %s" % current_batch_size)
print("Current batch size = %s" % current_batch_size)
self._last_processed_ts = new_entries[-1]['data']['ts']
logging.debug(f"Updated last_processed_ts {self._last_processed_ts}")
print(f"Updated last_processed_ts {self._last_processed_ts}")
current_start_ts = current_end_ts
if current_start_ts >= initEndTs:
break

if current_batch_size >= self.batch_size_limit or current_end_ts >= initEndTs:
batch_time_ranges.append((batch_start_ts, current_end_ts))
print("Exporting batch size of %s entries from %s to %s" % (current_batch_size, batch_start_ts, current_end_ts))
file_names.extend(self.export_batches(user_id, ts, batch_time_ranges, archive_dir))
self.delete_batches(user_id, ts, batch_time_ranges)
batch_time_ranges = []
current_batch_size = 0
batch_start_ts = current_end_ts
print("Exported data to %s files" % len(file_names))
print("Exported file names: %s" % file_names)
return file_names

current_start_ts = current_end_ts
else:
remaining_entries = self.get_export_timeseries_entries(user_id, ts, current_start_ts, initEndTs)
if not remaining_entries:
print("No new data to export, breaking out of while loop")
if current_batch_size > 0:
batch_time_ranges.append((batch_start_ts, current_start_ts))
break
else:
print(f"No entries found in current time range from {current_start_ts} to {current_end_ts}")
print("Incrementing time range")
current_start_ts = current_end_ts
# new_entries = self.get_export_timeseries_entries(user_id, ts, current_start_ts, current_end_ts)

if batch_time_ranges:
print("Exporting accumulated entries of batch size of %s entries from %s to %s" % (current_batch_size, batch_start_ts, current_start_ts))
file_names.extend(self.export_batches(user_id, ts, batch_time_ranges, archive_dir))
self.delete_batches(user_id, ts, batch_time_ranges)
print(f"Exported data to {len(file_names)} files")
print(f"Exported file names: {file_names}")
return file_names
'''
# if new_entries:
# self._last_processed_ts = new_entries[-1]['data']['ts']
# print(f"Updated last_processed_ts {self._last_processed_ts}")

def export_batches(self, user_id, ts, batch_time_range, archive_dir):
file_names = []
(start_ts, end_ts) = batch_time_range
file_name = archive_dir + "/archive_%s_%s_%s" % (user_id, start_ts, end_ts)
print(f"Exporting entries from {start_ts} to {end_ts} to file: {file_name}")
epret.export(user_id, ts, start_ts, end_ts, file_name)
file_names.append(file_name)
return file_names
# if current_end_ts >= initEndTs:
# file_name = archive_dir + "/archive_%s_%s_%s" % (user_id, current_start_ts, current_end_ts)
# print(f"Exporting entries from {current_start_ts} to {current_end_ts} to file: {file_name}")
# epret.export(user_id, ts, current_start_ts, current_end_ts, file_name)
# file_names.append(file_name)
# self.delete_timeseries_entries(user_id, ts, current_start_ts, current_end_ts)

# def delete_batches(self, user_id, ts, batch_time_range):
# (start_ts, end_ts) = batch_time_range
# self.delete_timeseries_entries(user_id, ts, start_ts, end_ts)
# current_start_ts = current_end_ts

'''
def export_batches(self, user_id, ts, batch_time_ranges, archive_dir):
file_names = []
for start_ts, end_ts in batch_time_ranges:
file_name = archive_dir + "/archive_%s_%s_%s" % (user_id, start_ts, end_ts)
print(f"Exporting entries from {start_ts} to {end_ts} to file: {file_name}")
epret.export(user_id, ts, start_ts, end_ts, file_name)
file_names.append(file_name)
return file_names
# else:
# remaining_entries = self.get_export_timeseries_entries(user_id, ts, initStartTs, initEndTs)
# if not remaining_entries:
# print("No new data to export, breaking out of while loop")
# break
# else:
# print(f"No entries found in current time range from {current_start_ts} to {current_end_ts}")
# print("Incrementing time range")
# current_start_ts = current_end_ts

def delete_batches(self, user_id, ts, batch_time_ranges):
for start_ts, end_ts in batch_time_ranges:
self.delete_timeseries_entries(user_id, ts, start_ts, end_ts)
'''
# print(f"Exported data to {len(file_names)} files")
# print(f"Exported file names: {file_names}")
# return file_names

# def export_pipeline_states(self, user_id, file_name):
# pipeline_state_list = list(edb.get_pipeline_state_db().find({"user_id": user_id}))
Expand All @@ -219,15 +162,6 @@ def delete_batches(self, user_id, ts, batch_time_ranges):
# json.dump(pipeline_state_list,
# gpfd, default=esj.wrapped_default, allow_nan=False, indent=4)

def delete_timeseries_entries(self, user_id, ts, ids_to_delete, start_ts_datetime, end_ts_datetime):
logging.debug("Deleting entries from database...")
print("Deleting entries from database...")
print("Number of entries to delete: %s" % len(ids_to_delete))
result = ts.timeseries_db.delete_many({'_id': {'$in': ids_to_delete}})
logging.debug("{} deleted entries from {} to {}".format(result.deleted_count, start_ts_datetime, end_ts_datetime))
print("{} deleted entries from {} to {}".format(result.deleted_count, start_ts_datetime, end_ts_datetime))

'''
def delete_timeseries_entries(self, user_id, ts, start_ts_datetime, end_ts_datetime):
export_queries = self.get_export_queries(start_ts_datetime, end_ts_datetime)
for key, value in export_queries.items():
Expand All @@ -246,7 +180,6 @@ def delete_timeseries_entries(self, user_id, ts, start_ts_datetime, end_ts_datet
print(f"Key query: {key}")
logging.debug("{} deleted entries from {} to {}".format(result.deleted_count, start_ts_datetime, end_ts_datetime))
print("{} deleted entries from {} to {}".format(result.deleted_count, start_ts_datetime, end_ts_datetime))
'''

def get_export_timeseries_entries(self, user_id, ts, start_ts_datetime, end_ts_datetime):
entries_to_export = []
Expand Down
2 changes: 1 addition & 1 deletion emission/purge_restore/restore_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ def run_restore_data_pipeline(self, user_id, file_names):
# self._last_processed_ts = pipelineState["last_processed_ts"]
# logging.debug("Restoring from file, last_processed_ts = %s" % (self._last_processed_ts))
# (tsdb_count, ucdb_count) = lmtfr.load_multi_timeline_for_range(file_prefix=file_name, continue_on_error=True)
(tsdb_count, ucdb_count) = eprit.load_multi_timeline_for_range(file_prefix=file_name, continue_on_error=True, batch_size=500)
(tsdb_count, ucdb_count) = eprit.load_multi_timeline_for_range(file_prefix=file_name, continue_on_error=True)
print("After load, tsdb_count = %s, ucdb_count = %s" % (tsdb_count, ucdb_count))
if tsdb_count == 0:
# Didn't process anything new so start at the same point next time
Expand Down
Loading

0 comments on commit 02fb2ce

Please sign in to comment.