Skip to content

Commit

Permalink
🦕 Rename all the variables to be more generic and remove support for …
Browse files Browse the repository at this point in the history
…"all" users

Since the "all user" branch does not currently support checking for parallel
pipeline runs.

We can add that when we handle parallel runs properly, or can turn the parallel
pipeline off while resetting
  • Loading branch information
shankari committed Aug 16, 2023
1 parent cd6ad00 commit a8c7dad
Showing 1 changed file with 12 additions and 10 deletions.
22 changes: 12 additions & 10 deletions bin/monitor/delete_single_pipeline_state_and_objects.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,10 +60,12 @@ def _email_2_user_list(email_list):
# Options corresponding to
# https://github.com/e-mission/e-mission-server/issues/333#issuecomment-312464984
group = parser.add_mutually_exclusive_group(required=True)
group.add_argument("-a", "--all", action="store_true", default=False,
help="reset the pipeline for all users")
group.add_argument("-p", "--platform", choices = ['android', 'ios'],
help="reset the pipeline for all on the specified platform")
# Add this back when we have the ability to turn off the pipeline
# or we handle parallel pipeline runs (by checking `curr_run_ts` properly below)
# group.add_argument("-a", "--all", action="store_true", default=False,
# help="reset the pipeline for all users")
group.add_argument("-u", "--user_list", nargs='+',
help="user ids to reset the pipeline for")
group.add_argument("-e", "--email_list", nargs='+',
Expand All @@ -82,9 +84,9 @@ def _email_2_user_list(email_list):
pipeline_query = {"pipeline_stage": ewps.PipelineStages[args.state].value}
trip_query = {"metadata.key": {"$in": args.del_objects}}
if args.all:
all_composite_states = list(edb.get_pipeline_state_db().find(pipeline_query))
logging.info(f"About to delete {len(all_composite_states)} entries for {ewps.PipelineStages.CREATE_COMPOSITE_OBJECTS}")
logging.debug(f"Full list is {all_composite_states}")
all_matching_states = list(edb.get_pipeline_state_db().find(pipeline_query))
logging.info(f"About to delete {len(all_matching_states)} entries for {args.state}")
logging.debug(f"Full list is {all_matching_states}")
logging.info(f"About to delete {edb.get_analysis_timeseries_db().count_documents(trip_query)} trips")
if not args.dry_run:
logging.info(f"Pipeline delete result is {edb.get_pipeline_state_db().delete_many(pipeline_query).raw_result}")
Expand All @@ -96,11 +98,11 @@ def _email_2_user_list(email_list):
for user_id in user_list:
pipeline_query['user_id'] = user_id
trip_query['user_id'] = user_id
user_composite_states = list(edb.get_pipeline_state_db().find(pipeline_query))
logging.info(f"found {len(user_composite_states)} for user {user_id}")
assert len(user_composite_states) == 1
print("current pipeline state = %s" % user_composite_states)
if user_composite_states[0].get('curr_run_ts', None) is not None:
user_matching_states = list(edb.get_pipeline_state_db().find(pipeline_query))
logging.info(f"found {len(user_matching_states)} for user {user_id}")
assert len(user_matching_states) == 1
print("current pipeline state = %s" % user_matching_states)
if user_matching_states[0].get('curr_run_ts', None) is not None:
logging.info(f"Skipping {user_id=} since the pipeline is currently running ")
continue
edb.get_pipeline_state_db().update_one(pipeline_query, {"$set": {"curr_run_ts": "MANUAL_RESET"}})
Expand Down

0 comments on commit a8c7dad

Please sign in to comment.