Skip to content

Commit

Permalink
Added CSV export as an option
Browse files Browse the repository at this point in the history
Default option for now is JSON which is easier for data restore.

Provided export flags as a boolean dictionary which calls the specific export function as per the set boolean flag.
  • Loading branch information
Mahadik, Mukul Chandrakant authored and Mahadik, Mukul Chandrakant committed Jan 9, 2024
1 parent 78979ff commit 2c1ef44
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 17 deletions.
57 changes: 41 additions & 16 deletions bin/purge_user_timeseries.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,44 +16,60 @@
DEFAULT_DIR_NAME = "/tmp"
DEFAULT_FILE_PREFIX = "old_timeseries_"

def exportOldTimeseriesAsCsv(user_id, last_ts_run, dir_name, file_prefix):
filename = dir_name + "/" + file_prefix + str(user_id) + ".json"
all_data = list(edb.get_timeseries_db().find({"user_id": user_id, "metadata.write_ts": { "$lt": last_ts_run}}))
# all_df = pd.json_normalize(all_data)
# print(all_df)
# all_df.to_csv(filename)
def exportOldTimeseriesAsCsv(user_id, all_data, filename):
logging.info("Exporting data to CSV...")
filename += ".csv"
all_df = pd.json_normalize(all_data)
all_df.to_csv(filename)
logging.info("Old timeseries data exported as CSV to {}".format(filename))

def exportOldTimeseriesAsJson(user_id, all_data, filename):
logging.info("Exporting data to JSON...")
def custom_encoder(obj):
if isinstance(obj, (UUID, ObjectId)):
return str(obj)
raise TypeError(f"Type {type(obj)} not serializable")

filename += ".json"
with open(filename, 'w') as file:
json.dump(all_data, file, default=custom_encoder)
logging.info("Old timeseries data exported as JSON to {}".format(filename))

logging.info("Old timeseries data exported to {}".format(filename))
exportOptions = {
'json_export': exportOldTimeseriesAsJson,
'csv_export': exportOldTimeseriesAsCsv
}

def purgeUserTimeseries(user_uuid, user_email=None, dir_name=DEFAULT_DIR_NAME, file_prefix=DEFAULT_FILE_PREFIX, unsafe_ignore_save=False):
def purgeUserTimeseries(exportFileFlags, user_uuid, user_email=None, dir_name=DEFAULT_DIR_NAME, file_prefix=DEFAULT_FILE_PREFIX, unsafe_ignore_save=False):
if user_uuid:
user_id = uuid.UUID(user_uuid)
else:
user_id = ecwu.User.fromEmail(user_email).uuid

cstate = esp.get_current_state(user_id, ecwp.PipelineStages.CREATE_CONFIRMED_OBJECTS)
last_ts_run = cstate['last_ts_run']
print(f"last_ts_run : {last_ts_run}")
logging.info(f"last_ts_run : {last_ts_run}")

if not last_ts_run:
logging.warning("No processed timeseries for user {}".format(user_id))
exit(1)

filename = dir_name + "/" + file_prefix + str(user_id)
logging.info("Querying data...")
all_data = list(edb.get_timeseries_db().find({"user_id": user_id, "metadata.write_ts": { "$lt": last_ts_run}}))
logging.info("Fetched data...")

if unsafe_ignore_save is True:
logging.warning("CSV export was ignored")
else:
exportOldTimeseriesAsCsv(user_id, last_ts_run, dir_name, file_prefix)
else:
for key in exportFileFlags:
logging.info(f"{key} = {exportFileFlags[key]}")
if exportFileFlags[key] is True:
exportOptions[key](user_id, all_data, filename)

logging.info("Deleting entries from database...")
result = edb.get_timeseries_db().delete_many({"user_id": user_id, "metadata.write_ts": { "$lt": last_ts_run}})
logging.debug("{} deleted entries since {}".format(result.deleted_count, datetime.fromtimestamp(last_ts_run)))
logging.info("{} deleted entries since {}".format(result.deleted_count, datetime.fromtimestamp(last_ts_run)))

if __name__ == '__main__':
logging.basicConfig(level=logging.DEBUG)
Expand All @@ -64,19 +80,28 @@ def purgeUserTimeseries(user_uuid, user_email=None, dir_name=DEFAULT_DIR_NAME, f
group.add_argument("-u", "--user_uuid")
parser.add_argument(
"-d", "--dir_name",
help="Target directory for exported csv data (defaults to {})".format(DEFAULT_DIR_NAME),
help="Target directory for exported JSON data (defaults to {})".format(DEFAULT_DIR_NAME),
default=DEFAULT_DIR_NAME
)
parser.add_argument(
"--file_prefix",
help="File prefix for exported csv data (defaults to {})".format(DEFAULT_FILE_PREFIX),
help="File prefix for exported JSON data (defaults to {})".format(DEFAULT_FILE_PREFIX),
default=DEFAULT_FILE_PREFIX
)
parser.add_argument(
"--csv_export",
help="Exporting to CSV file alongwith default JSON file",
action='store_true'
)
parser.add_argument(
"--unsafe_ignore_save",
help="Ignore csv export of deleted data (not recommended, this operation is definitive)",
help="Ignore export of deleted data (not recommended, this operation is definitive)",
action='store_true'
)

args = parser.parse_args()
purgeUserTimeseries(args.user_uuid, args.user_email, args.dir_name, args.file_prefix, args.unsafe_ignore_save)
exportFileFlags = {
'json_export': True,
'csv_export': args.csv_export if args.csv_export is not None else False
}
purgeUserTimeseries(exportFileFlags, args.user_uuid, args.user_email, args.dir_name, args.file_prefix, args.unsafe_ignore_save)
4 changes: 3 additions & 1 deletion bin/restore_user_timeseries.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ def restoreUserTimeseries(filename):
# print(df)
# result = edb.get_timeseries_db().insert_many(data)

logging.info("Importing data from file...")
with open(filename, 'r') as file:
data = json.load(file)

Expand All @@ -28,8 +29,9 @@ def restoreUserTimeseries(filename):
document["_id"] = ObjectId(document["_id"])
document["user_id"] = Binary(uuid.UUID(document["user_id"]).bytes, 0x03)

logging.info("Inserting data into database...")
result = edb.get_timeseries_db().insert_many(data)
logging.debug("{} documents successfully inserted".format(len(result.inserted_ids)))
logging.info("{} documents successfully inserted".format(len(result.inserted_ids)))

if __name__ == '__main__':
logging.basicConfig(level=logging.DEBUG)
Expand Down

0 comments on commit 2c1ef44

Please sign in to comment.