Skip to content

Commit

Permalink
Improved Invalidating Raw Entry
Browse files Browse the repository at this point in the history
Replaced loop for DB query with  bulk_write method
  • Loading branch information
humbleOldSage committed Jan 17, 2024
1 parent 54659fb commit ca62c72
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 7 deletions.
12 changes: 8 additions & 4 deletions emission/analysis/intake/segmentation/trip_segmentation.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,10 @@
from builtins import *
from builtins import object
import logging

import pandas as pd
logger=logging.getLogger("")
logger.setLevel(logging.DEBUG)
from timeit import default_timer as timer
import emission.storage.timeseries.abstract_timeseries as esta
import emission.storage.decorations.place_queries as esdp
import emission.storage.decorations.analysis_timeseries_queries as esda
Expand Down Expand Up @@ -70,7 +73,6 @@ def segment_current_trips(user_id):
logging.debug("len(loc_df) == 0, early return")
epq.mark_segmentation_done(user_id, None)
return

out_of_order_points = loc_df[loc_df.ts.diff() < 0]
if len(out_of_order_points) > 0:
logging.info("Found out of order points!")
Expand All @@ -81,8 +83,10 @@ def segment_current_trips(user_id):
# invalidate in the database.
out_of_order_id_list = out_of_order_points["_id"].tolist()
logging.debug("out_of_order_id_list = %s" % out_of_order_id_list)
for ooid in out_of_order_id_list:
ts.invalidate_raw_entry(ooid)
#start=timer()
ts.invalidate_raw_entry(out_of_order_id_list)
#end=timer()
#logger.debug(f"Elapsed: {end-start}")

filters_in_df = loc_df["filter"].dropna().unique()
logging.debug("Filters in the dataframe = %s" % filters_in_df)
Expand Down
25 changes: 22 additions & 3 deletions emission/storage/timeseries/builtin_timeseries.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
import pandas as pd
import pymongo
import itertools

from pymongo import UpdateOne
import emission.core.get_database as edb
import emission.storage.timeseries.abstract_timeseries as esta

Expand Down Expand Up @@ -440,8 +440,27 @@ def update_data(user_id, key, obj_id, data):
logging.debug("updating entry %s into timeseries" % new_entry)
edb.save(ts.get_timeseries_db(key), new_entry)

def invalidate_raw_entry(self, obj_id):
self.timeseries_db.update_one({"_id": obj_id, "user_id": self.user_id}, {"$set": {"invalid": True}})
def invalidate_raw_entry(self, out_of_order_id_list):
update_operations = [
UpdateOne({"_id": obj_id, "user_id": self.user_id}, {"$set": {"invalid": True}})
for obj_id in out_of_order_id_list
]
if update_operations:
self.timeseries_db.bulk_write(update_operations)

## Other way could be to make batches as below. However, I couldn't find any
# improvement in times for update_operations of the size range 3000.
#
#
# def invalidate_raw_entry(self, out_of_order_id_list):
# update_operations = [
# UpdateOne({"_id": obj_id, "user_id": self.user_id}, {"$set": {"invalid": True}})
# for obj_id in out_of_order_id_list
# ]
# batch_size=1000
# for i in range(0,len(update_operations),batch_size):
# batch=update_operations[i:i+ batch_size]
# self.timeseries_db.bulk_write(batch)

def find_entries_count(self, key_list = None, time_query = None, geo_query = None, extra_query_list = None):
"""
Expand Down

0 comments on commit ca62c72

Please sign in to comment.