From 6da59b203970371bc688e9976ba44c3f59f85b9b Mon Sep 17 00:00:00 2001 From: Wen Guan Date: Tue, 6 Aug 2024 10:26:08 +0200 Subject: [PATCH] split connections to listener and active, functions to fallback to https --- workflow/lib/idds/iworkflow/asyncresult.py | 312 +++++++++++++++++++-- 1 file changed, 284 insertions(+), 28 deletions(-) diff --git a/workflow/lib/idds/iworkflow/asyncresult.py b/workflow/lib/idds/iworkflow/asyncresult.py index f8a132cb..ae55569e 100644 --- a/workflow/lib/idds/iworkflow/asyncresult.py +++ b/workflow/lib/idds/iworkflow/asyncresult.py @@ -9,15 +9,23 @@ # - Wen Guan, , 2023 - 2024 import logging +import os import random import socket -import stomp import threading import time import traceback +import uuid from queue import Queue +try: + import stomp + with_stomp = True +except Exception as ex: + print(f"Failed to import stomp, with_stomp is False: {ex}") + with_stomp = False + from idds.common.constants import WorkflowType, GracefulEvent from idds.common.utils import json_dumps, json_loads, setup_logging, get_unique_id_for_dict from .base import Base @@ -149,12 +157,18 @@ def __init__(self, work_context, name=None, wait_num=1, wait_keys=[], multi_jobs if internal_id: self.internal_id = internal_id self._work_context = work_context - self._work_context.init_brokers() + try: + self._work_context.init_brokers() + self._broker_initialized = True + except Exception as ex: + logging.warn(f"Failed to initialize messaging broker, will use Rest: {ex}") + self._broker_initialized = False self._name = name self._queue = Queue() self._connections = [] + self._subscribe_connections = [] self._graceful_stop = False self._subscribe_thread = None self._subscribed = False @@ -179,6 +193,28 @@ def __init__(self, work_context, name=None, wait_num=1, wait_keys=[], multi_jobs self._nologs = False + self._num_stomp_failures = 0 + self._max_stomp_failures = 5 + try: + max_stomp_failures = os.environ.get("AYNC_RESULT_MAX_STOMP_FAILURES", None) + if max_stomp_failures: + max_stomp_failures = int(max_stomp_failures) + self._max_stomp_failures = max_stomp_failures + except Exception: + pass + + self._poll_period = 300 + try: + poll_period = os.environ.get("AYNC_RESULT_POLL_PERIOD", None) + if poll_period: + poll_period = int(poll_period) + self._poll_period = poll_period + except Exception: + pass + + self._is_messaging_ok = True + self._is_polling_ok = True + @property def logger(self): return logging.getLogger(self.__class__.__name__) @@ -340,8 +376,42 @@ def disconnect(self): con.disconnect() except Exception: pass + self._connections = [] + + def disconnect_subscribe(self): + for con in self._subscribe_connections: + try: + if con.is_connected(): + con.disconnect() + except Exception: + pass + self._subscribe_connections = [] + + def has_connections(self, conns): + if conns: + for con in conns: + try: + if con.is_connected(): + return True + except Exception: + pass + return False + + def get_connections(self, conns): + if conns: + for con in conns: + try: + if con.is_connected(): + return con + except Exception: + pass + return None def connect_to_messaging_broker(self): + conn = self.get_connections(self._connections) + if conn: + return conn + workflow_context = self._work_context brokers = workflow_context.brokers @@ -362,7 +432,10 @@ def connect_to_messaging_broker(self): self._connections = [conn] return conn - def subscribe_to_messaging_brokers(self): + def subscribe_to_messaging_brokers(self, force=False): + if self._subscribed and not force and self._subscribe_connections: + return self._subscribe_connections + workflow_context = self._work_context brokers = workflow_context.brokers conns = [] @@ -384,7 +457,7 @@ def subscribe_to_messaging_brokers(self): timeout = workflow_context.broker_timeout - self.disconnect() + self.disconnect_subscribe() listener = MessagingListener(brokers, self._queue, logger=self.logger) conns = [] @@ -416,11 +489,11 @@ def subscribe_to_messaging_brokers(self): ack='auto', headers=subscribe_selector) self.logger.info("subscribe to %s:%s with selector: %s" % (broker, port, subscribe_selector)) conns.append(conn) - self._connections = conns + self._subscribe_connections = conns return conns - def publish(self, ret, key=None): - conn = self.connect_to_messaging_broker() + def get_message(self, ret, key=None): + message = {} workflow_context = self._work_context if key is None: if self._current_job_kwargs: @@ -433,42 +506,208 @@ def publish(self, ret, key=None): 'type': 'iworkflow', 'internal_id': str(self.internal_id), 'request_id': workflow_context.request_id} - body = json_dumps({'ret': ret, 'key': key, 'internal_id': self.internal_id}) + body = json_dumps({'ret': ret, 'key': key, 'internal_id': self.internal_id, 'type': 'iworkflow', + 'request_id': workflow_context.request_id}) + message = {"headers": headers, "body": body} + elif workflow_context.workflow_type == WorkflowType.iWork: + headers = {'persistent': 'true', + 'type': 'iwork', + 'internal_id': str(self.internal_id), + 'request_id': workflow_context.request_id, + 'transform_id': workflow_context.transform_id} + body = json_dumps({'ret': ret, 'key': key, 'internal_id': self.internal_id, 'type': 'iwork', + 'request_id': workflow_context.request_id, + 'transform_id': workflow_context.transform_id}) + message = {"headers": headers, "body": body} + return message + + def publish_message(self, ret, key=None): + message = self.get_message(ret=ret, key=key) + headers = message['headers'] + body = message['body'] + conn = self.connect_to_messaging_broker() + workflow_context = self._work_context + if workflow_context.workflow_type in [WorkflowType.iWorkflow, WorkflowType.iWorkflowLocal]: conn.send(body=body, destination=workflow_context.broker_destination, id='idds-iworkflow_%s' % self.internal_id, ack='auto', headers=headers ) - self.logger.info("publish header: %s, body: %s" % (str(headers), str(body))) + self.logger.info(f"published header: {headers}, body: {body}") elif workflow_context.workflow_type == WorkflowType.iWork: - headers = {'persistent': 'true', - 'type': 'iwork', - 'internal_id': str(self.internal_id), - 'request_id': workflow_context.request_id, - 'transform_id': workflow_context.transform_id} - body = json_dumps({'ret': ret, 'key': key, 'internal_id': self.internal_id}) conn.send(body=body, destination=workflow_context.broker_destination, id='idds-iwork_%s' % self.internal_id, ack='auto', headers=headers ) - self.logger.info("publish header: %s, body: %s" % (str(headers), str(body))) - self.disconnect() + self.logger.info(f"published header: {headers}, body: {body}") + # self.disconnect() - def run_subscriber(self): + def get_request_id_internal_id(self): + workflow_context = self._work_context + request_id, transform_id, internal_id = None, None, None + if workflow_context.workflow_type in [WorkflowType.iWorkflow, WorkflowType.iWorkflowLocal]: + request_id = workflow_context.request_id + transform_id = 0 + internal_id = self.internal_id + elif workflow_context.workflow_type == WorkflowType.iWork: + request_id = workflow_context.request_id + transform_id = workflow_context.transform_id + internal_id = self.internal_id + else: + request_id = workflow_context.request_id + transform_id = 0 + internal_id = self.internal_id + return request_id, transform_id, internal_id + + def publish_through_panda_server(self, message): + request_id, transform_id, internal_id = self.get_request_id_internal_id() + + import idds.common.utils as idds_utils + import pandaclient.idds_api as idds_api + idds_server = self._work_context.get_idds_server() + # request_id = self._context.request_id + client = idds_api.get_api(idds_utils.json_dumps, + idds_host=idds_server, + compress=True, + manager=True) + status, ret = client.send_messages(request_id=request_id, transform_id=transform_id, internal_id=internal_id, msgs=[message]) + if status: + self.logger.info(f"published message through panda server: {message}") + else: + self.logger.error(f"failed to publish message through panda server, status: {status}, ret: {ret}") + + def publish_through_idds_server(self, message): + request_id, transform_id, internal_id = self.get_request_id_internal_id() + + from idds.client.clientmanager import ClientManager + client = ClientManager(host=self._work_context.get_idds_server()) + status, ret = client.send_messages(request_id=request_id, transform_id=transform_id, internal_id=internal_id, msgs=[message]) + if status: + self.logger.info(f"published message through idds server: {message}") + else: + self.logger.error(f"failed to publish message through idds server, status: {status}, ret: {ret}") + + def publish_through_api(self, ret, key=None, force=False): + message = self.get_message(ret=ret, key=key) + # headers = message['headers'] + # body = message['body'] + message['msg_type'] = 'async_result' + + try: + request_id, transform_id, internal_id = self.get_request_id_internal_id() + if request_id is None: + if force: + request_id = 0 + else: + self.logger.warn("Not to publish message through API since the request id is None") + return + + if self._work_context.service == 'panda': + self.publish_through_panda_server(message) + else: + self.publish_through_idds_server(message) + except Exception as ex: + self.logger.error(f"Failed to publish message through API: {ex}") + + def publish(self, ret, key=None, force=False): + stomp_failed = False + if with_stomp and self._broker_initialized: + try: + self.logger.info("publishing results through messaging brokers") + self.publish_message(ret=ret, key=key) + except Exception as ex: + self.logger.warn(f"Failed to publish result through messaging brokers: {ex}") + stomp_failed = True + + if not with_stomp or not self._broker_initialized or stomp_failed: + self.logger.info("publishing results through http API") + self.publish_through_api(ret=ret, key=key, force=force) + + def poll_messages_through_panda_server(self, request_id, transform_id, internal_id): + if request_id is None: + self.logger.warn("Not to poll message through panda server, since the request_id is None") + return [] + + import idds.common.utils as idds_utils + import pandaclient.idds_api as idds_api + idds_server = self._work_context.get_idds_server() + # request_id = self._work_context.request_id + client = idds_api.get_api(idds_utils.json_dumps, + idds_host=idds_server, + compress=True, + manager=True) + status, messages = client.get_messages(request_id=request_id, transform_id=transform_id, internal_id=internal_id) + if status: + self.logger.info(f"poll message through panda server, number of messages: {len(messages)}") + return messages + else: + self.logger.error(f"failed to poll messages through panda server, error: {messages}") + return [] + + def poll_messages_through_idds_server(self, request_id, transform_id, internal_id): + if request_id is None: + self.logger.warn("Not to poll message through idds server, since the request_id is None") + return [] + + from idds.client.clientmanager import ClientManager + client = ClientManager(host=self._work_context.get_idds_server()) + status, messages = client.get_messages(request_id=request_id, transform_id=transform_id, internal_id=internal_id) + if status: + self.logger.info(f"poll message through idds server, number of messages: {len(messages)}") + return messages + else: + self.logger.error(f"failed to poll messages through idds server, error: {messages}") + return [] + + def poll_messages(self, force=False): + try: + request_id, transform_id, internal_id = self.get_request_id_internal_id() + if request_id is None: + if force: + request_id = 0 + else: + self.logger.warn("Not to poll message, since the request_id is None") + return + + if self._work_context.service == 'panda': + messages = self.poll_messages_through_panda_server(request_id=request_id, transform_id=transform_id, internal_id=internal_id) + else: + messages = self.poll_messages_through_idds_server(request_id=request_id, transform_id=transform_id, internal_id=internal_id) + + for message in messages: + self._queue.put(message) + except Exception as ex: + self.logger.error(f"Failed to poll message: {ex}") + + def run_subscriber(self, force=False): try: self.logger.info("run subscriber") - self.subscribe_to_messaging_brokers() + if with_stomp and self._broker_initialized and self._num_stomp_failures < self._max_stomp_failures: + try: + self.subscribe_to_messaging_brokers(force=True) + except Exception as ex: + self.logger.warn(f"run subscriber fails to subscribe to message broker: {ex}") + self._num_stomp_failures += 1 + self._is_messaging_ok = False + self._broker_initialized = False + while self._graceful_stop and not self._graceful_stop.is_set(): - has_failed_conns = False - for conn in self._connections: - if not conn.is_connected(): - has_failed_conns = True - if has_failed_conns: - self.subscribe_to_messaging_brokers() - time.sleep(1) + if with_stomp and self._broker_initialized and self._is_messaging_ok and self._num_stomp_failures < self._max_stomp_failures: + has_failed_conns = False + for conn in self._subscribe_connections: + if not conn.is_connected(): + has_failed_conns = True + if has_failed_conns: + self.subscribe_to_messaging_brokers(force=True) + time.sleep(1) + else: + self.poll_messages(force=force) + time.sleep(self._poll_period) + + self.poll_messages(force=force) self.stop() except Exception as ex: self.logger.error("run subscriber failed with error: %s" % str(ex)) @@ -492,10 +731,10 @@ def get_results(self, nologs=True): def get_results_percentage(self): return self._results_percentage - def subscribe(self): + def subscribe(self, force=False): if not self._subscribed: self._graceful_stop = GracefulEvent() - thread = threading.Thread(target=self.run_subscriber, name="RunSubscriber") + thread = threading.Thread(target=self.run_subscriber, kwargs={'force': force}, name="RunSubscriber") thread.start() time.sleep(1) self._subscribed = True @@ -559,3 +798,20 @@ def wait_result(self, timeout=None, force_return_results=False): self.wait_results(timeout=timeout, force_return_results=force_return_results) results = self.results return results + + def is_ok(self): + try: + self.subscribe(force=True) + test_id = str(uuid.uuid4()) + self.publish(test_id, force=True) + ret = self.wait_result(force_return_results=True) + self.logger.info(f"AsyncResult: publish: {test_id}, received: {ret}") + if test_id == ret: + self.logger.info("AsyncResult is ok") + return True + else: + self.logger.info("AsyncResult is not ok") + return False + except Exception as ex: + self.logger.error(f"AsyncResult is not ok: {ex}") + return False