From a8c7dadac63480745c88b8b1cc643df6fc70fa7b Mon Sep 17 00:00:00 2001 From: Shankari Date: Wed, 16 Aug 2023 08:19:54 -0700 Subject: [PATCH] =?UTF-8?q?=F0=9F=A6=95=20Rename=20all=20the=20variables?= =?UTF-8?q?=20to=20be=20more=20generic=20and=20remove=20support=20for=20"a?= =?UTF-8?q?ll"=20users?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Since the "all user" branch does not currently support checking for parallel pipeline runs. We can add that when we handle parallel runs properly, or can turn the parallel pipeline off while resetting --- ...elete_single_pipeline_state_and_objects.py | 22 ++++++++++--------- 1 file changed, 12 insertions(+), 10 deletions(-) diff --git a/bin/monitor/delete_single_pipeline_state_and_objects.py b/bin/monitor/delete_single_pipeline_state_and_objects.py index 031f323bf..c011afc1e 100644 --- a/bin/monitor/delete_single_pipeline_state_and_objects.py +++ b/bin/monitor/delete_single_pipeline_state_and_objects.py @@ -60,10 +60,12 @@ def _email_2_user_list(email_list): # Options corresponding to # https://github.com/e-mission/e-mission-server/issues/333#issuecomment-312464984 group = parser.add_mutually_exclusive_group(required=True) - group.add_argument("-a", "--all", action="store_true", default=False, - help="reset the pipeline for all users") group.add_argument("-p", "--platform", choices = ['android', 'ios'], help="reset the pipeline for all on the specified platform") + # Add this back when we have the ability to turn off the pipeline + # or we handle parallel pipeline runs (by checking `curr_run_ts` properly below) + # group.add_argument("-a", "--all", action="store_true", default=False, + # help="reset the pipeline for all users") group.add_argument("-u", "--user_list", nargs='+', help="user ids to reset the pipeline for") group.add_argument("-e", "--email_list", nargs='+', @@ -82,9 +84,9 @@ def _email_2_user_list(email_list): pipeline_query = {"pipeline_stage": ewps.PipelineStages[args.state].value} trip_query = {"metadata.key": {"$in": args.del_objects}} if args.all: - all_composite_states = list(edb.get_pipeline_state_db().find(pipeline_query)) - logging.info(f"About to delete {len(all_composite_states)} entries for {ewps.PipelineStages.CREATE_COMPOSITE_OBJECTS}") - logging.debug(f"Full list is {all_composite_states}") + all_matching_states = list(edb.get_pipeline_state_db().find(pipeline_query)) + logging.info(f"About to delete {len(all_matching_states)} entries for {args.state}") + logging.debug(f"Full list is {all_matching_states}") logging.info(f"About to delete {edb.get_analysis_timeseries_db().count_documents(trip_query)} trips") if not args.dry_run: logging.info(f"Pipeline delete result is {edb.get_pipeline_state_db().delete_many(pipeline_query).raw_result}") @@ -96,11 +98,11 @@ def _email_2_user_list(email_list): for user_id in user_list: pipeline_query['user_id'] = user_id trip_query['user_id'] = user_id - user_composite_states = list(edb.get_pipeline_state_db().find(pipeline_query)) - logging.info(f"found {len(user_composite_states)} for user {user_id}") - assert len(user_composite_states) == 1 - print("current pipeline state = %s" % user_composite_states) - if user_composite_states[0].get('curr_run_ts', None) is not None: + user_matching_states = list(edb.get_pipeline_state_db().find(pipeline_query)) + logging.info(f"found {len(user_matching_states)} for user {user_id}") + assert len(user_matching_states) == 1 + print("current pipeline state = %s" % user_matching_states) + if user_matching_states[0].get('curr_run_ts', None) is not None: logging.info(f"Skipping {user_id=} since the pipeline is currently running ") continue edb.get_pipeline_state_db().update_one(pipeline_query, {"$set": {"curr_run_ts": "MANUAL_RESET"}})