diff --git a/tr_sys/tr_ars/api.py b/tr_sys/tr_ars/api.py index e74f2ca3..a683912f 100644 --- a/tr_sys/tr_ars/api.py +++ b/tr_sys/tr_ars/api.py @@ -428,33 +428,23 @@ def message(req, key): scorestat = utils.ScoreStatCalc(res) mesg.result_stat = scorestat #before we do basically anything else, we normalize - try: - parent_pk = mesg.ref_id - #message_to_merge =utils.get_safe(data,"message") - message_to_merge = data - agent_name = str(mesg.actor.agent.name) - utils.pre_merge_process(message_to_merge,key, agent_name, inforesid) - if mesg.data and 'results' in mesg.data and mesg.data['results'] != None and len(mesg.data['results']) > 0: - mesg = Message.create(name=mesg.name, status=status, actor=mesg.actor, ref=mesg) - mesg.status = status - mesg.code = code - mesg.data = data - mesg.save() - if agent_name.startswith('ara-'): - logging.debug("Starting merge for "+str(mesg.pk)) - new_merged = utils.merge_received(parent_pk,message_to_merge['message'],agent_name) - logging.debug("Merge complete for "+str(new_merged.pk)) - #the merged versions is what gets consumed. So, it's all we do post processing on? - utils.post_process(new_merged.data,new_merged.id, agent_name) - logging.debug("Post processing complete for "+str(new_merged.pk)) - - except Exception as e: - logger.debug("Problem with merger or post processeing for %s " % key) - logger.exception("error in merger or post processin") - new_merged.status='E' - new_merged.code = 422 - new_merged.save() - else: + parent_pk = mesg.ref_id + #message_to_merge =utils.get_safe(data,"message") + message_to_merge = data + agent_name = str(mesg.actor.agent.name) + utils.pre_merge_process(message_to_merge,key, agent_name, inforesid) + if mesg.data and 'results' in mesg.data and mesg.data['results'] != None and len(mesg.data['results']) > 0: + mesg = Message.create(name=mesg.name, status=status, actor=mesg.actor, ref=mesg) + mesg.status = status + mesg.code = code + mesg.data = data + mesg.save() + logging.info("pre async call") + if agent_name.startswith('ara-'): + utils.merge_and_post_process.apply_async((parent_pk,message_to_merge['message'],agent_name)) + logging.info("post async call") + + # create child message if this one already has results if mesg.data and 'results' in mesg.data and mesg.data['results'] != None and len(mesg.data['results']) > 0: mesg = Message.create(name=mesg.name, status=status, actor=mesg.actor, ref=mesg) diff --git a/tr_sys/tr_ars/tasks.py b/tr_sys/tr_ars/tasks.py index e7f2b9fd..096793b3 100644 --- a/tr_sys/tr_ars/tasks.py +++ b/tr_sys/tr_ars/tasks.py @@ -158,24 +158,22 @@ def send_message(actor_dict, mesg_dict, timeout=300): mesg.url = url mesg.save() logger.debug('+++ message saved: %s' % (mesg.pk)) - try: - agent_name = str(mesg.actor.agent.name) - if mesg.code == 200: - if agent_name.startswith('ara-'): - logging.debug("Merge starting for "+str(mesg.pk)) - new_merged = utils.merge_received(parent_pk,message_to_merge['message'], agent_name) - logging.debug("Merge complete for "+str(new_merged.pk)) - utils.post_process(new_merged.data,new_merged.pk, agent_name) - logging.debug("Post processing done for "+str(new_merged.pk)) - else: - logging.debug("Skipping merge and post for "+str(mesg.pk)+ - " because the contributing message is in state: "+str(mesg.code)) - #This exception relates to the merged version, not the original. Never the two may interfere - except Exception as e: - logger.debug('Problem with post processing or merger of %s for pk: %s' % (inforesid, mesg.pk)) - new_merged.status='E' - new_merged.code = 422 - new_merged.save() + + agent_name = str(mesg.actor.agent.name) + if mesg.code == 200: + logger.info("pre async call") + if agent_name.startswith('ara-'): + # logging.debug("Merge starting for "+str(mesg.pk)) + # new_merged = utils.merge_received(parent_pk,message_to_merge['message'], agent_name) + # logging.debug("Merge complete for "+str(new_merged.pk)) + # utils.post_process(new_merged.data,new_merged.pk, agent_name) + # logging.debug("Post processing done for "+str(new_merged.pk)) + utils.merge_and_post_process.apply_async((parent_pk,message_to_merge['message'],agent_name)) + logger.info("post async call") + else: + logging.debug("Skipping merge and post for "+str(mesg.pk)+ + " because the contributing message is in state: "+str(mesg.code)) + @shared_task(name="catch_timeout") def catch_timeout_async(): diff --git a/tr_sys/tr_ars/utils.py b/tr_sys/tr_ars/utils.py index 66ba2fb6..1a507f0d 100644 --- a/tr_sys/tr_ars/utils.py +++ b/tr_sys/tr_ars/utils.py @@ -588,6 +588,20 @@ def post_process(data,key, agent_name): mesg.data = data mesg.save() +@shared_task(name="merge_and_post_process") +def merge_and_post_process(parent_pk,message_to_merge, agent_name): + #logging.debug("Starting merge for "+str(mesg.pk)) + try: + merged = merge_received(parent_pk,message_to_merge, agent_name) + post_process(merged.data,merged.id, agent_name) + except Exception as e: + logging.debug("Problem with merger or post processeing for %s " % str(parent_pk)) + logging.exception("error in merger or post processing") + merged.status='E' + merged.code = 422 + merged.save() + + def scrub_null_attributes(data): nodes = get_safe(data,"message","knowledge_graph","nodes") edges = get_safe(data,"message","knowledge_graph","edges") @@ -682,10 +696,7 @@ def annotate_nodes(mesg,data,agent_name): else: post_processing_error(mesg,data,"Error in annotation of nodes") except Exception as e: - logging.exception("error in node annotation internal function for agent: %s" % agent_name) - logging.error("Unexpected error 3: {}".format(traceback.format_exception(type(e), e, e.__traceback__))) - logging.error(type(e).__name__) - logging.error(e.args) + logging.exception("error in node annotation internal function") raise e #else: # with open(str(mesg.actor)+".json", "w") as outfile: