Skip to content

Commit

Permalink
Reduced number of files by merging data using batch_size
Browse files Browse the repository at this point in the history
One observation is that current logic is creating multiple files, which is okay.
But these files don’t really have a lot of entries.

What could be more efficient is to perhaps store more entries until a threshold say 5000 or 10000 (like batch_size in load_multi_timeline_for_range).
If this default threshold batch size isn't reached, keep adding to the same file.
Keeping updating the end_ts but start_ts would remain the same.

----

Found an edge case
Incremental export is fine.

Let’s say we have chosen full export.

In the sample data we have 1906 entries.
In batch testing I’m setting batch_size_limit to 500.

Now, when the code executes:
- current_end_ts will be set to initEndTs which is current time () - FUZZ time as set by the pipeline queries.
- new_entries will have all 1906 entries which is more than the batch_size_limit
- BOTH batch_size_limit check and current_end_ts checks will be TRUE.
- It will export the excessive batch of more than limit and also delete entries.
- While it seems fine, it will cause issues when we attempt to restore data whose size exceeds batch size.

Hence, need a way to handle this by perhaps:
- Setting the current_end_ts to the ts value of the entry at the batch_size_limit - 1 index.
- Fetching entries unto this point only.
- Then fetching the next batch of entries.

Essentially, in this scenario, unlike the incremental scenario where we are incrementing current_end_ts by 3600 seconds,
Here, we need to increment current_end_ts to the next batch size limit - 1 index entry’s ts value.

--------

Working on this but pending writing tests for this.
Also, batch size still being exceeded.
  • Loading branch information
Mahadik, Mukul Chandrakant authored and Mahadik, Mukul Chandrakant committed Aug 31, 2024
1 parent 63f7985 commit 34ab73d
Show file tree
Hide file tree
Showing 3 changed files with 191 additions and 139 deletions.
2 changes: 1 addition & 1 deletion emission/purge_restore/export_timeseries.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ def get_from_all_three_sources_with_retry(user_id, in_query, databases=None):

return retry_lists

def export(user_id, ts, start_ts, end_ts, file_name, ma_bool, databases=None):
def export(user_id, ts, start_ts, end_ts, file_name, databases=None):
logging.info("In export: Databases = %s" % databases)
print("In export: Databases = %s" % databases)

Expand Down
155 changes: 75 additions & 80 deletions emission/purge_restore/purge_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ def purge_data(user_id, archive_dir, export_type):
class PurgeDataPipeline:
def __init__(self):
self._last_processed_ts = None
self.batch_size_limit = 10000

@property
def last_processed_ts(self):
Expand All @@ -43,10 +44,7 @@ def run_purge_data_pipeline(self, user_id, archive_dir, export_type):
time_query = espq.get_time_range_for_purge_data(user_id)

if archive_dir is None:
if "DATA_DIR" in os.environ:
archive_dir = os.environ['DATA_DIR']
else:
archive_dir = "emission/archived"
archive_dir = os.environ.get('DATA_DIR', "emission/archived")

if os.path.isdir(archive_dir) == False:
os.mkdir(archive_dir)
Expand All @@ -56,89 +54,75 @@ def run_purge_data_pipeline(self, user_id, archive_dir, export_type):
print("Inside: purge_data - Start time: %s" % initStartTs)
print("Inside: purge_data - End time: %s" % initEndTs)

export_queries_initial = {
# 'trip_time_query': estt.TimeQuery("data.start_ts", initStartTs, initEndTs),
# 'place_time_query': estt.TimeQuery("data.enter_ts", initStartTs, initEndTs),
'loc_time_query': estt.TimeQuery("data.ts", initStartTs, initEndTs)
}

file_names = []
entries_to_export = self.get_export_timeseries_entries(user_id, ts, time_query.startTs, time_query.endTs, export_queries_initial)
count_entries = len(entries_to_export)

if initStartTs is None:
# If running the pipeline PURGE stage for first time, choosing the first timestamp
# from the timeseries as the starting point
# Else cannot add 1 hour (3600 seconds) to a NoneType value if incremental option is selected
print("Inside: purge_data - entries_to_export = %s" % entries_to_export[0]['data']['ts'])
current_start_ts = entries_to_export[0]['data']['ts']
else:
current_start_ts = initStartTs

while True:
print("Inside while loop: current_start_ts = %s" % current_start_ts)
if export_type == 'full':
current_end_ts = initEndTs
print("Inside export_type full, setting current_end_ts to initEndTs: %s" % current_end_ts)
elif export_type == 'incremental':
if (current_start_ts + 3600) >= initEndTs:
current_end_ts = initEndTs
print("Inside export_type incremental, setting current_end_ts to initEndTs: %s" % current_end_ts)
else:
current_end_ts = current_start_ts + 3600
print("Inside export_type incremental, increasing current_end_ts by 1 hour: %s" % current_end_ts)
else:
raise ValueError("Unknown export_type %s" % export_type)

print(f"Processing data from {current_start_ts} to {current_end_ts}")
entries_to_export = self.get_export_timeseries_entries(user_id, ts, time_query.startTs, time_query.endTs)
# count_entries = len(entries_to_export)

# If running the pipeline PURGE stage for first time, choose the first timestamp from the timeseries as the starting point
# Otherwise cannot add 1 hour (3600 seconds) to a NoneType value if incremental option is selected
current_start_ts = initStartTs if initStartTs is not None else entries_to_export[0]['data']['ts']
batch_start_ts = current_start_ts
current_batch_size = 0
batch_time_ranges = []

while current_start_ts < initEndTs:
current_end_ts = min(current_start_ts + 3600, initEndTs) if export_type == 'incremental' else initEndTs

file_name = archive_dir + "/archive_%s_%s_%s" % (user_id, current_start_ts, current_end_ts)
export_queries = epret.export(user_id, ts, current_start_ts, current_end_ts, file_name, False)
# epret.export(user_id, ts, current_start_ts, current_end_ts, file_name, False)

entries_to_export_1 = self.get_export_timeseries_entries(user_id, ts, time_query.startTs, time_query.endTs, export_queries_initial)
count_entries_1 = len(entries_to_export_1)

if export_queries is None and count_entries_1 > 0:
print("No entries found in current time range from %s to %s" % (current_start_ts, current_end_ts))
print("Incrementing time range by 1 hour")
current_start_ts = current_end_ts
continue
# if count_entries_2 == 0 and count_entries_1 == 0:
elif export_queries is None and count_entries_1 == 0:
# Didn't process anything new so start at the same point next time
# self._last_processed_ts = None
logging.debug("No new data to export, breaking out of while loop")
print("No new data to export, breaking out of while loop")
break

entries_to_export_2 = self.get_export_timeseries_entries(user_id, ts, current_start_ts, current_end_ts, export_queries)
count_entries_2 = len(entries_to_export_2)
print("count_entries_2 = %s" % count_entries_2)

new_entries = self.get_export_timeseries_entries(user_id, ts, current_start_ts, current_end_ts)

logging.debug("Exporting to file: %s" % file_name)
print("Exporting to file: %s" % file_name)
file_names.append(file_name)
print("File names: %s" % file_names)
if new_entries:
current_batch_size += len(new_entries)
logging.debug("Current batch size = %s" % current_batch_size)
print("Current batch size = %s" % current_batch_size)
self._last_processed_ts = new_entries[-1]['data']['ts']
logging.debug(f"Updated last_processed_ts {self._last_processed_ts}")
print(f"Updated last_processed_ts {self._last_processed_ts}")

if current_batch_size >= self.batch_size_limit or current_end_ts >= initEndTs:
batch_time_ranges.append((batch_start_ts, current_end_ts))
print("Exporting batch size of %s entries from %s to %s" % (current_batch_size, batch_start_ts, current_end_ts))
file_names.extend(self.export_batches(user_id, ts, batch_time_ranges, archive_dir))
self.delete_batches(user_id, ts, batch_time_ranges)

batch_time_ranges = []
current_batch_size = 0
batch_start_ts = current_end_ts

# self.export_pipeline_states(user_id, file_name)
self.delete_timeseries_entries(user_id, ts, current_start_ts, current_end_ts, export_queries)
current_start_ts = current_end_ts
else:
remaining_entries = self.get_export_timeseries_entries(user_id, ts, current_start_ts, initEndTs)
if not remaining_entries:
print("No new data to export, breaking out of while loop")
if current_batch_size > 0:
batch_time_ranges.append((batch_start_ts, current_start_ts))
break
else:
print(f"No entries found in current time range from {current_start_ts} to {current_end_ts}")
print("Incrementing time range")
current_start_ts = current_end_ts

print("Total entries to export: %s" % count_entries)
print("Entries exported in timerange %s to %s: %s" % (current_start_ts, current_end_ts, count_entries_2))
print("New count entries to export: %s" % count_entries_1)
self._last_processed_ts = entries_to_export_2[-1]['data']['ts']
print("Updated last_processed_ts %s" % self._last_processed_ts)
if batch_time_ranges:
print("Exporting accumulated entries of batch size of %s entries from %s to %s" % (current_batch_size, batch_start_ts, current_start_ts))
file_names.extend(self.export_batches(user_id, ts, batch_time_ranges, archive_dir))
self.delete_batches(user_id, ts, batch_time_ranges)

current_start_ts = current_end_ts
if current_start_ts >= initEndTs:
break
print(f"Exported data to {len(file_names)} files")
print(f"Exported file names: {file_names}")
return file_names

print("Exported data to %s files" % len(file_names))
print("Exported file names: %s" % file_names)
def export_batches(self, user_id, ts, batch_time_ranges, archive_dir):
file_names = []
for start_ts, end_ts in batch_time_ranges:
file_name = archive_dir + "/archive_%s_%s_%s" % (user_id, start_ts, end_ts)
print(f"Exporting entries from {start_ts} to {end_ts} to file: {file_name}")
epret.export(user_id, ts, start_ts, end_ts, file_name)
file_names.append(file_name)
return file_names

def delete_batches(self, user_id, ts, batch_time_ranges):
for start_ts, end_ts in batch_time_ranges:
self.delete_timeseries_entries(user_id, ts, start_ts, end_ts)

# def export_pipeline_states(self, user_id, file_name):
# pipeline_state_list = list(edb.get_pipeline_state_db().find({"user_id": user_id}))
# logging.info("Found %d pipeline states %s" %
Expand All @@ -149,7 +133,8 @@ def run_purge_data_pipeline(self, user_id, archive_dir, export_type):
# json.dump(pipeline_state_list,
# gpfd, default=esj.wrapped_default, allow_nan=False, indent=4)

def delete_timeseries_entries(self, user_id, ts, start_ts_datetime, end_ts_datetime, export_queries):
def delete_timeseries_entries(self, user_id, ts, start_ts_datetime, end_ts_datetime):
export_queries = self.get_export_queries(start_ts_datetime, end_ts_datetime)
for key, value in export_queries.items():
ts_query = ts._get_query(time_query=value)
print(ts_query)
Expand All @@ -167,8 +152,9 @@ def delete_timeseries_entries(self, user_id, ts, start_ts_datetime, end_ts_datet
logging.debug("{} deleted entries from {} to {}".format(result.deleted_count, start_ts_datetime, end_ts_datetime))
print("{} deleted entries from {} to {}".format(result.deleted_count, start_ts_datetime, end_ts_datetime))

def get_export_timeseries_entries(self, user_id, ts, start_ts_datetime, end_ts_datetime, export_queries):
def get_export_timeseries_entries(self, user_id, ts, start_ts_datetime, end_ts_datetime):
entries_to_export = []
export_queries = self.get_export_queries(start_ts_datetime, end_ts_datetime)
for key, value in export_queries.items():
tq = value
sort_key = ts._get_sort_key(tq)
Expand All @@ -177,3 +163,12 @@ def get_export_timeseries_entries(self, user_id, ts, start_ts_datetime, end_ts_d
logging.debug(f"Key query: {key}")
logging.debug("{} fetched entries from {} to {}".format(ts_db_count, start_ts_datetime, end_ts_datetime))
return entries_to_export

def get_export_queries(self, start_ts, end_ts):
export_queries = {
# 'trip_time_query': estt.TimeQuery("data.start_ts", initStartTs, initEndTs),
# 'place_time_query': estt.TimeQuery("data.enter_ts", initStartTs, initEndTs),
'loc_time_query': estt.TimeQuery("data.ts", start_ts, end_ts)
}
return export_queries

Loading

0 comments on commit 34ab73d

Please sign in to comment.