diff --git a/common/lib/idds/common/constants.py b/common/lib/idds/common/constants.py index 62167c10..f9f23f66 100644 --- a/common/lib/idds/common/constants.py +++ b/common/lib/idds/common/constants.py @@ -491,6 +491,7 @@ class MessageSource(IDDSEnum): Carrier = 3 Conductor = 4 Rest = 5 + OutSide = 6 class MessageDestination(IDDSEnum): diff --git a/main/lib/idds/agents/carrier/iutils.py b/main/lib/idds/agents/carrier/iutils.py index 9f1ac167..733af5b2 100644 --- a/main/lib/idds/agents/carrier/iutils.py +++ b/main/lib/idds/agents/carrier/iutils.py @@ -184,17 +184,20 @@ def handle_messages_asyncresult(messages, logger=None, log_prefix='', update_pro req_msgs = {} - for msg in messages: - if 'from_idds' in msg and msg['from_idds']: - continue + for item in messages: + if 'from_idds' in item: + if type(item['from_idds']) in [bool] and item['from_idds'] or type(item['from_idds']) in [str] and item['from_idds'].lower() == 'true': + continue + + msg = item['msg'] # ret = msg['ret'] # key = msg['key'] # internal_id = msg['internal_id'] # msg_type = msg['type'] - request_id = msg['request_id'] - transform_id = msg.get('transform_id', 0) - internal_id = msg.get('internal_id', None) + request_id = msg['body']['request_id'] + transform_id = msg['body'].get('transform_id', 0) + internal_id = msg['body'].get('internal_id', None) # if msg_type in ['iworkflow']: if request_id not in req_msgs: @@ -203,21 +206,17 @@ def handle_messages_asyncresult(messages, logger=None, log_prefix='', update_pro req_msgs[request_id][transform_id] = {} if internal_id not in req_msgs[request_id][transform_id]: req_msgs[request_id][transform_id][internal_id] = [] - req_msgs[request_id][transform_id][internal_id].append(msg) - - for request_id in req_msgs: - for transform_id in req_msgs[request_id]: - for internal_id in req_msgs[request_id][transform_id]: - msgs = req_msgs[request_id][transform_id][internal_id] - core_messages.add_message(msg_type=MessageType.AsyncResult, - status=MessageStatus.NoNeedDelivery, - destination=MessageDestination.AsyncResult, - source=MessageSource.Outside, - request_id=request_id, - workload_id=None, - transform_id=transform_id, - internal_id=internal_id, - num_contents=len(msgs), - msg_content=msgs) - - logger.debug(f"{log_prefix} handle_messages_asyncresult, add {len(msgs)} for request_id {request_id} transform_id {transform_id} internal_id {internal_id}") + + msgs = [msg] + core_messages.add_message(msg_type=MessageType.AsyncResult, + status=MessageStatus.NoNeedDelivery, + destination=MessageDestination.AsyncResult, + source=MessageSource.OutSide, + request_id=request_id, + workload_id=None, + transform_id=transform_id, + internal_id=internal_id, + num_contents=len(msgs), + msg_content=msgs) + + logger.debug(f"{log_prefix} handle_messages_asyncresult, add {len(msgs)} for request_id {request_id} transform_id {transform_id} internal_id {internal_id}") diff --git a/main/lib/idds/agents/carrier/receiver.py b/main/lib/idds/agents/carrier/receiver.py index 612bda5e..64e2d7f3 100644 --- a/main/lib/idds/agents/carrier/receiver.py +++ b/main/lib/idds/agents/carrier/receiver.py @@ -110,10 +110,12 @@ def get_output_messages(self): if msg_size < 10: self.logger.debug("Received message(only log first 10 messages): %s" % str(msg)) name = msg['name'] - body = msg['body'] + # headers = msg['headers'] + # body = msg['body'] + # from_idds = msg['from_idds'] if name not in msgs: msgs[name] = [] - msgs[name].append(body) + msgs[name].append(msg) msg_size += 1 if msg_size >= self.bulk_message_size: break @@ -151,7 +153,10 @@ def add_receiver_monitor_task(self): self.add_task(task) def handle_messages(self, output_messages, log_prefix): - ret_msg_handle = handle_messages_processing(output_messages, + output_messages_new = [] + for msg in output_messages: + output_messages_new.append(msg['msg']['body']) + ret_msg_handle = handle_messages_processing(output_messages_new, logger=self.logger, log_prefix=log_prefix, update_processing_interval=self.update_processing_interval) @@ -202,7 +207,7 @@ def handle_messages_asyncresult(self, output_messages, log_prefix): def handle_messages_channels(self, output_messages, log_prefix): for channel in output_messages: - if channel in ['asyncresult']: + if channel in ['asyncresult', 'AsyncResult']: self.handle_messages_asyncresult(output_messages[channel], log_prefix) else: self.handle_messages(output_messages[channel], log_prefix) diff --git a/main/lib/idds/agents/common/plugins/messaging.py b/main/lib/idds/agents/common/plugins/messaging.py index a594a479..46a7730a 100644 --- a/main/lib/idds/agents/common/plugins/messaging.py +++ b/main/lib/idds/agents/common/plugins/messaging.py @@ -52,8 +52,10 @@ def on_error(self, frame): self.logger.error('[broker] [%s]: %s', self.__broker, frame.body) def on_message(self, frame): - self.logger.debug('[broker] %s [%s]: %s', self.name, self.__broker, frame.body) - self.__output_queue.put({'name': self.name, 'msg': frame.body}) + self.logger.debug(f'[broker] {self.name} [{self.__broker}]: headers: {frame.headers}, body: {frame.body}') + headers = frame.headers + from_idds = headers.get('from_idds', 'false') + self.__output_queue.put({'name': self.name, 'from_idds': from_idds, 'msg': {'headers': frame.headers, 'body': json_loads(frame.body)}}) pass @@ -209,17 +211,30 @@ def send_message(self, msg): destination = msg['destination'] if 'destination' in msg else 'default' conn, queue_dest, destination = self.get_connection(destination) + from_idds = 'false' + if 'from_idds' in msg and msg['from_idds']: + from_idds = 'true' + if conn: self.logger.info("Sending message to message broker(%s): %s" % (destination, msg['msg_id'])) self.logger.debug("Sending message to message broker(%s): %s" % (destination, json_dumps(msg['msg_content']))) - conn.send(body=json_dumps(msg['msg_content']), - destination=queue_dest, - id='atlas-idds-messaging', - ack='auto', - headers={'persistent': 'true', - 'ttl': self.timetolive, - 'vo': 'atlas', - 'msg_type': str(msg['msg_type']).lower()}) + if type(msg['msg_content']) in [dict] and 'headers' in msg['msg_content'] and 'body' in msg['msg_content']: + msg['msg_content']['headers']['from_idds'] = from_idds + conn.send(body=json_dumps(msg['msg_content']['body']), + headers=msg['msg_content']['headers'], + destination=queue_dest, + id='atlas-idds-messaging', + ack='auto') + else: + conn.send(body=json_dumps(msg['msg_content']), + destination=queue_dest, + id='atlas-idds-messaging', + ack='auto', + headers={'persistent': 'true', + 'ttl': self.timetolive, + 'vo': 'atlas', + 'from_idds': from_idds, + 'msg_type': str(msg['msg_type']).lower()}) else: self.logger.info("No brokers defined, discard(%s): %s" % (destination, msg['msg_id'])) @@ -260,8 +275,9 @@ def __init__(self, name="MessagingReceiver", logger=None, **kwargs): def get_listener(self, broker, name): if self.listener is None: self.listener = {} - self.listener[name] = MessagingListener(broker, self.output_queue, name=name, logger=self.logger) - return self.listener + if name not in self.listener: + self.listener[name] = MessagingListener(broker, self.output_queue, name=name, logger=self.logger) + return self.listener[name] def subscribe(self): self.receiver_conns = self.connect_to_messaging_brokers() @@ -298,7 +314,7 @@ def execute_subscribe(self): for name in self.receiver_conns: for conn in self.receiver_conns[name]: if not conn.is_connected(): - conn.set_listener('message-receiver', self.get_listener(conn.transport._Transport__host_and_ports[0])) + conn.set_listener('message-receiver', self.get_listener(conn.transport._Transport__host_and_ports[0], name)) # conn.start() conn.connect(self.channels[name]['username'], self.channels[name]['password'], wait=True) conn.subscribe(destination=self.channels[name]['destination'], id='atlas-idds-messaging', ack='auto') diff --git a/main/lib/idds/agents/conductor/conductor.py b/main/lib/idds/agents/conductor/conductor.py index 8e0ceab8..2a742702 100644 --- a/main/lib/idds/agents/conductor/conductor.py +++ b/main/lib/idds/agents/conductor/conductor.py @@ -111,7 +111,7 @@ def get_messages(self): if BaseAgent.min_request_id is None: return [] - destination = [MessageDestination.Outside, MessageDestination.ContentExt] + destination = [MessageDestination.Outside, MessageDestination.ContentExt, MessageDestination.AsyncResult] messages = core_messages.retrieve_messages(status=MessageStatus.New, min_request_id=BaseAgent.min_request_id, bulk_size=self.retrieve_bulk_size, @@ -196,6 +196,8 @@ def is_message_processed(self, message): self.logger.info("message %s has reached max retries %s" % (message['msg_id'], self.max_retries)) return True msg_type = message['msg_type'] + if msg_type in [MessageType.AsyncResult]: + return True if msg_type not in [MessageType.ProcessingFile]: if retries < self.replay_times: return False @@ -286,6 +288,7 @@ def run(self): to_discard_messages = [] for message in messages: message['destination'] = message['destination'].name + message['from_idds'] = True num_contents += message['num_contents'] if self.is_message_processed(message): diff --git a/main/lib/idds/core/messages.py b/main/lib/idds/core/messages.py index 457d6803..78f292c6 100644 --- a/main/lib/idds/core/messages.py +++ b/main/lib/idds/core/messages.py @@ -50,7 +50,7 @@ def add_messages(messages, bulk_size=1000, session=None): def retrieve_messages(bulk_size=None, msg_type=None, status=None, destination=None, source=None, request_id=None, workload_id=None, transform_id=None, processing_id=None, use_poll_period=False, retries=None, delay=None, - min_request_id=None, fetching_id=None, session=None): + min_request_id=None, fetching_id=None, internal_id=None, session=None): """ Retrieve up to $bulk messages. @@ -71,7 +71,7 @@ def retrieve_messages(bulk_size=None, msg_type=None, status=None, destination=No request_id=request_id, workload_id=workload_id, transform_id=transform_id, processing_id=processing_id, retries=retries, delay=delay, fetching_id=fetching_id, - min_request_id=min_request_id, + min_request_id=min_request_id, internal_id=internal_id, use_poll_period=use_poll_period, session=session) diff --git a/main/lib/idds/orm/base/alembic/versions/3073c5de8f73_add_conditions_and_campaign.py b/main/lib/idds/orm/base/alembic/versions/3073c5de8f73_add_conditions_and_campaign.py new file mode 100644 index 00000000..b4d8b4eb --- /dev/null +++ b/main/lib/idds/orm/base/alembic/versions/3073c5de8f73_add_conditions_and_campaign.py @@ -0,0 +1,86 @@ +#!/usr/bin/env python +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# You may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0OA +# +# Authors: +# - Wen Guan, , 2024 + +"""add conditions and campaign + +Revision ID: 3073c5de8f73 +Revises: 40ead97e63c6 +Create Date: 2024-08-05 13:21:37.265614+00:00 + +""" +import datetime + +from alembic import op +from alembic import context +import sqlalchemy as sa + +from idds.common.constants import ConditionStatus +from idds.orm.base.types import EnumWithValue +from idds.orm.base.types import JSON + +# revision identifiers, used by Alembic. +revision = '3073c5de8f73' +down_revision = '40ead97e63c6' +branch_labels = None +depends_on = None + + +def upgrade() -> None: + if context.get_context().dialect.name in ['oracle', 'mysql', 'postgresql']: + schema = context.get_context().version_table_schema if context.get_context().version_table_schema else '' + + op.add_column('requests', sa.Column('campaign', sa.String(100)), schema=schema) + op.add_column('requests', sa.Column('campaign_group', sa.String(250)), schema=schema) + op.add_column('requests', sa.Column('campaign_tag', sa.String(20)), schema=schema) + + op.add_column('transforms', sa.Column('internal_id', sa.String(20)), schema=schema) + op.add_column('transforms', sa.Column('has_previous_conditions', sa.Integer()), schema=schema) + op.add_column('transforms', sa.Column('loop_index', sa.Integer()), schema=schema) + op.add_column('transforms', sa.Column('cloned_from', sa.BigInteger()), schema=schema) + op.add_column('transforms', sa.Column('triggered_conditions', JSON()), schema=schema) + op.add_column('transforms', sa.Column('untriggered_conditions', JSON()), schema=schema) + + op.create_table('conditions', + sa.Column('condition_id', sa.BigInteger(), sa.Sequence('CONDITION_ID_SEQ', schema=schema)), + sa.Column('request_id', sa.BigInteger(), nullable=False), + sa.Column('internal_id', sa.String(20), nullable=False), + sa.Column('status', EnumWithValue(ConditionStatus), nullable=False), + sa.Column('is_loop', sa.Integer()), + sa.Column('loop_index', sa.Integer()), + sa.Column('cloned_from', sa.BigInteger()), + sa.Column("created_at", sa.DateTime, default=datetime.datetime.utcnow, nullable=False), + sa.Column("updated_at", sa.DateTime, default=datetime.datetime.utcnow, onupdate=datetime.datetime.utcnow, nullable=False), + sa.Column("evaluate_result", sa.String(200)), + sa.Column('previous_transforms', JSON()), + sa.Column('following_transforms', JSON()), + sa.Column('condition', JSON()), + schema=schema) + op.create_primary_key('CONDITION_PK', 'conditions', ['condition_id'], schema=schema) + op.create_unique_constraint('CONDITION_ID_UQ', 'conditions', ['request_id', 'internal_id'], schema=schema) + + +def downgrade() -> None: + if context.get_context().dialect.name in ['oracle', 'mysql', 'postgresql']: + schema = context.get_context().version_table_schema if context.get_context().version_table_schema else '' + + op.drop_column('requests', 'campaign', schema=schema) + op.drop_column('requests', 'campaign_group', schema=schema) + op.drop_column('requests', 'campaign_tag', schema=schema) + + op.drop_column('transforms', 'internal_id', schema=schema) + op.drop_column('transforms', 'has_previous_conditions', schema=schema) + op.drop_column('transforms', 'loop_index', schema=schema) + op.drop_column('transforms', 'cloned_from', schema=schema) + op.drop_column('transforms', 'triggered_conditions', schema=schema) + op.drop_column('transforms', 'untriggered_conditions', schema=schema) + + op.drop_constraint('CONDITION_ID_UQ', table_name='conditions', schema=schema) + op.drop_constraint('CONDITION_PK', table_name='conditions', schema=schema) + op.drop_table('conditions', schema=schema) diff --git a/main/lib/idds/orm/base/alembic/versions/40ead97e63c6_messages_table_add_internal_id.py b/main/lib/idds/orm/base/alembic/versions/40ead97e63c6_messages_table_add_internal_id.py new file mode 100644 index 00000000..9fd5a3d7 --- /dev/null +++ b/main/lib/idds/orm/base/alembic/versions/40ead97e63c6_messages_table_add_internal_id.py @@ -0,0 +1,39 @@ +#!/usr/bin/env python +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# You may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0OA +# +# Authors: +# - Wen Guan, , 2024 + +"""messages table add internal_id + +Revision ID: 40ead97e63c6 +Revises: cc9f730e54c5 +Create Date: 2024-07-01 14:02:47.670000+00:00 + +""" +from alembic import op +from alembic import context +import sqlalchemy as sa + + +# revision identifiers, used by Alembic. +revision = '40ead97e63c6' +down_revision = 'cc9f730e54c5' +branch_labels = None +depends_on = None + + +def upgrade() -> None: + if context.get_context().dialect.name in ['oracle', 'mysql', 'postgresql']: + schema = context.get_context().version_table_schema if context.get_context().version_table_schema else '' + op.add_column('messages', sa.Column('internal_id', sa.String(20)), schema=schema) + + +def downgrade() -> None: + if context.get_context().dialect.name in ['oracle', 'mysql', 'postgresql']: + schema = context.get_context().version_table_schema if context.get_context().version_table_schema else '' + op.drop_column('messages', 'internal_id', schema=schema) diff --git a/main/lib/idds/orm/messages.py b/main/lib/idds/orm/messages.py index da619921..9bb7efcd 100644 --- a/main/lib/idds/orm/messages.py +++ b/main/lib/idds/orm/messages.py @@ -137,7 +137,7 @@ def retrieve_messages(bulk_size=1000, msg_type=None, status=None, source=None, destination=None, request_id=None, workload_id=None, transform_id=None, processing_id=None, fetching_id=None, min_request_id=None, use_poll_period=False, retries=None, - delay=None, session=None): + delay=None, internal_id=None, session=None): """ Retrieve up to $bulk messages. @@ -183,6 +183,8 @@ def retrieve_messages(bulk_size=1000, msg_type=None, status=None, source=None, query = query.filter_by(transform_id=transform_id) if processing_id is not None: query = query.filter_by(processing_id=processing_id) + if internal_id is not None: + query = query.filter_by(internal_id=internal_id) if retries: query = query.filter_by(retries=retries) if delay: diff --git a/main/lib/idds/rest/v1/messages.py b/main/lib/idds/rest/v1/messages.py index e41248cb..acd8f599 100644 --- a/main/lib/idds/rest/v1/messages.py +++ b/main/lib/idds/rest/v1/messages.py @@ -128,18 +128,18 @@ def post(self, request_id, workload_id, transform_id, internal_id): try: msg = self.get_request().data and json_loads(self.get_request().data) - if 'msg_type' in msg and msg['msg_type'] in ['async_result']: - msg['from_idds'] = 'true' - add_message(msg_type=MessageType.AsyncResult, - status=MessageStatus.New, - destination=MessageDestination.AsyncResult, - source=MessageSource.Rest, - request_id=request_id, - workload_id=workload_id, - transform_id=transform_id, - internal_id=internal_id, - num_contents=1, - msg_content=msg) + if type(msg) in (list, tuple) and type(msg[0]) in [dict] and 'headers' and msg[0] and 'channel' in msg[0]['headers'] and msg[0]['headers']['channel'] == 'asyncresult': + for msg_item in msg: + add_message(msg_type=MessageType.AsyncResult, + status=MessageStatus.New, + destination=MessageDestination.AsyncResult, + source=MessageSource.Rest, + request_id=request_id, + workload_id=workload_id, + transform_id=transform_id, + internal_id=internal_id, + num_contents=1, + msg_content=msg_item) elif 'command' in msg and msg['command'] in ['update_request', 'update_processing']: status = msg['parameters']['status'] if status in [RequestStatus.ToCancel, RequestStatus.ToSuspend]: diff --git a/main/lib/idds/tests/test_migrate_requests.py b/main/lib/idds/tests/test_migrate_requests.py index 57efd7ff..8746ab1d 100644 --- a/main/lib/idds/tests/test_migrate_requests.py +++ b/main/lib/idds/tests/test_migrate_requests.py @@ -77,15 +77,15 @@ def migrate(): # for old_request_id in [152]: # for old_request_id in [60]: # noqa E115 # for old_request_id in [200]: # noqa E115 - old_request_ids = [635] + old_request_ids = [8526] for old_request_id in old_request_ids: # noqa E115 # doma 183 reqs = cm1.get_requests(request_id=old_request_id, with_metadata=True) cm2 = ClientManager(host=dev_host) # cm2 = ClientManager(host=doma_host) # cm2 = ClientManager(host=atlas_host) - cm2 = ClientManager(host=slac_k8s_dev_host) - # cm2 = ClientManager(host=slac_k8s_prod_host) + # cm2 = ClientManager(host=slac_k8s_dev_host) + cm2 = ClientManager(host=slac_k8s_prod_host) # cm2 = ClientManager(host=cern_k8s_dev_host) # print(reqs) diff --git a/main/tools/container/docker/hpo/commands b/main/tools/container/docker/hpo/commands index 4bc02f65..898f5034 100644 --- a/main/tools/container/docker/hpo/commands +++ b/main/tools/container/docker/hpo/commands @@ -4,3 +4,7 @@ docker login --username=wguanicedew docker images docker tag idds_hpo_nevergrad wguanicedew/idds_hpo_nevergrad docker push wguanicedew/idds_hpo_nevergrad + + +podman build --tag ml_test . + podman run -it -v /tmp/wguan:/data ml_test diff --git a/monitor/data/conf.js b/monitor/data/conf.js index 698f534c..c416974a 100644 --- a/monitor/data/conf.js +++ b/monitor/data/conf.js @@ -1,9 +1,9 @@ var appConfig = { - 'iddsAPI_request': "https://lxplus994.cern.ch:443/idds/monitor_request/null/null", - 'iddsAPI_transform': "https://lxplus994.cern.ch:443/idds/monitor_transform/null/null", - 'iddsAPI_processing': "https://lxplus994.cern.ch:443/idds/monitor_processing/null/null", - 'iddsAPI_request_detail': "https://lxplus994.cern.ch:443/idds/monitor/null/null/true/false/false", - 'iddsAPI_transform_detail': "https://lxplus994.cern.ch:443/idds/monitor/null/null/false/true/false", - 'iddsAPI_processing_detail': "https://lxplus994.cern.ch:443/idds/monitor/null/null/false/false/true" + 'iddsAPI_request': "https://lxplus943.cern.ch:443/idds/monitor_request/null/null", + 'iddsAPI_transform': "https://lxplus943.cern.ch:443/idds/monitor_transform/null/null", + 'iddsAPI_processing': "https://lxplus943.cern.ch:443/idds/monitor_processing/null/null", + 'iddsAPI_request_detail': "https://lxplus943.cern.ch:443/idds/monitor/null/null/true/false/false", + 'iddsAPI_transform_detail': "https://lxplus943.cern.ch:443/idds/monitor/null/null/false/true/false", + 'iddsAPI_processing_detail': "https://lxplus943.cern.ch:443/idds/monitor/null/null/false/false/true" } diff --git a/workflow/lib/idds/iworkflow/asyncresult.py b/workflow/lib/idds/iworkflow/asyncresult.py index ae55569e..4c85f512 100644 --- a/workflow/lib/idds/iworkflow/asyncresult.py +++ b/workflow/lib/idds/iworkflow/asyncresult.py @@ -161,7 +161,7 @@ def __init__(self, work_context, name=None, wait_num=1, wait_keys=[], multi_jobs self._work_context.init_brokers() self._broker_initialized = True except Exception as ex: - logging.warn(f"Failed to initialize messaging broker, will use Rest: {ex}") + logging.warn(f"{self.internal_id} Failed to initialize messaging broker, will use Rest: {ex}") self._broker_initialized = False self._name = name @@ -170,6 +170,7 @@ def __init__(self, work_context, name=None, wait_num=1, wait_keys=[], multi_jobs self._connections = [] self._subscribe_connections = [] self._graceful_stop = False + self._is_stop = False self._subscribe_thread = None self._subscribed = False @@ -232,7 +233,8 @@ def wait_keys(self): for kwargs in self._multi_jobs_kwargs_list: k = get_unique_id_for_dict(kwargs) k = "%s:%s" % (self._name, k) - self.logger.info("args (%s) to key: %s" % (str(kwargs), k)) + request_id, transform_id, internal_id = self.get_request_id_internal_id() + self.logger.info(f"request_id {request_id} transform_id {transform_id} internal_id {internal_id} args ({kwargs}) to key: {k}") self._wait_keys.add(k) self._wait_num = len(self._wait_keys) return self._wait_keys @@ -249,7 +251,15 @@ def is_all_results_available(self): @is_all_results_available.setter def is_all_results_available(self, value): - raise Exception("Not allowd to set is_all_results_available") + raise Exception(f"{self.internal_id} Not allowd to set is_all_results_available") + + @property + def is_stop(self): + return self._is_stop + + @is_stop.setter + def is_stop(self, value): + raise Exception(f"{self.internal_id} Not allowd to set is_stop") @property def is_finished(self): @@ -261,7 +271,7 @@ def is_finished(self): @is_finished.setter def is_finished(self, value): - raise Exception("Not allowd to set is_finished") + raise Exception(f"{self.internal_id} Not allowd to set is_finished") @property def is_subfinished(self): @@ -273,7 +283,7 @@ def is_subfinished(self): @is_subfinished.setter def is_subfinished(self, value): - raise Exception("Not allowd to set is_subfinished") + raise Exception(f"{self.internal_id} Not allowd to set is_subfinished") @property def is_failed(self): @@ -285,7 +295,7 @@ def is_failed(self): @is_failed.setter def is_failed(self, value): - raise Exception("Not allowd to set is_failed") + raise Exception(f"{self.internal_id} Not allowd to set is_failed") @property def is_terminated(self): @@ -293,7 +303,7 @@ def is_terminated(self): @is_terminated.setter def is_terminated(self, value): - raise Exception("Not allowd to set is_terminated") + raise Exception(f"{self.internal_id} Not allowd to set is_terminated") @property def results(self): @@ -308,13 +318,13 @@ def results(self): else: self._bad_results.append(ret) except Exception as ex: - self.logger.error("Received bad result: %s: %s" % (str(ret), str(ex))) + self.logger.error(f"{self.internal_id} Received bad result: {ret}: {ex}") if self._bad_results: - self.logger.error("Received bad results: %s" % str(self._bad_results)) + self.logger.error(f"{self.internal_id} Received bad results: {self._bad_results}") if not self._nologs: - self.logger.debug("_results: %s, bad_results: %s" % (str(self._results), str(self._bad_results))) - self.logger.debug("wait_keys: %s, wait_num: %s" % (str(self.wait_keys), self._wait_num)) + self.logger.debug(f"{self.internal_id} _results: {self._results}, bad_results: {self._bad_results}") + self.logger.debug(f"{self.internal_id} wait_keys: {self.wait_keys}, wait_num: {self._wait_num}") rets_dict = {} for result in self._results: @@ -338,7 +348,7 @@ def results(self): ret_map.add_result(key=k, result=rets[k]) if has_new_data: - self.logger.debug('percent %s, results: %s' % (self._results_percentage, str(ret_map))) + self.logger.debug(f'{self.internal_id} percent {self._results_percentage}, results: {ret_map}') return ret_map else: @@ -353,7 +363,7 @@ def results(self): self._results_percentage = len(rets) * 1.0 / self._wait_num if has_new_data: - self.logger.debug('percent %s, results: %s' % (self._results_percentage, str(rets))) + self.logger.debug(f'{self.internal_id} percent {self._results_percentage}, results: {rets}') if self._wait_num == 1: if rets: @@ -364,9 +374,9 @@ def results(self): @results.setter def results(self, value): - raise Exception("Not allowed to set results.") + raise Exception(f"{self.internal_id} Not allowed to set results.") if type(value) not in [list, tuple]: - raise Exception("Results must be list or tuple, currently it is %s" % type(value)) + raise Exception(f"{self.internal_id} Results must be list or tuple, currently it is {value}") self._results = value def disconnect(self): @@ -450,10 +460,10 @@ def subscribe_to_messaging_brokers(self, force=False): b_addr = addrinfo[4][0] broker_addresses.append((b_addr, port)) except socket.gaierror as error: - self.logger.error('Cannot resolve hostname %s: %s' % (b, str(error))) + self.logger.error(f'{self.internal_id} Cannot resolve hostname {b}: {error}') self._graceful_stop.set() - self.logger.info("Resolved broker addresses: %s" % (broker_addresses)) + self.logger.info(f"{self.internal_id} Resolved broker addresses: {broker_addresses}") timeout = workflow_context.broker_timeout @@ -487,7 +497,7 @@ def subscribe_to_messaging_brokers(self, force=False): # ack='auto', conf=subscribe_selector) conn.subscribe(destination=workflow_context.broker_destination, id=subscribe_id, ack='auto', headers=subscribe_selector) - self.logger.info("subscribe to %s:%s with selector: %s" % (broker, port, subscribe_selector)) + self.logger.info(f"{self.internal_id} subscribe to {broker}:{port} with selector: {subscribe_selector}") conns.append(conn) self._subscribe_connections = conns return conns @@ -499,25 +509,27 @@ def get_message(self, ret, key=None): if self._current_job_kwargs: key = get_unique_id_for_dict(self._current_job_kwargs) key = "%s:%s" % (self._name, key) - self.logger.info("publish args (%s) to key: %s" % (str(self._current_job_kwargs), key)) + self.logger.info(f"{self.internal_id} publish args ({self._current_job_kwargs}) to key: {key}") if workflow_context.workflow_type in [WorkflowType.iWorkflow, WorkflowType.iWorkflowLocal]: headers = {'persistent': 'true', + 'channel': 'asyncresult', 'type': 'iworkflow', 'internal_id': str(self.internal_id), 'request_id': workflow_context.request_id} - body = json_dumps({'ret': ret, 'key': key, 'internal_id': self.internal_id, 'type': 'iworkflow', - 'request_id': workflow_context.request_id}) + body = {'ret': ret, 'key': key, 'internal_id': self.internal_id, 'type': 'iworkflow', + 'request_id': workflow_context.request_id} message = {"headers": headers, "body": body} elif workflow_context.workflow_type == WorkflowType.iWork: headers = {'persistent': 'true', + 'channel': 'asyncresult', 'type': 'iwork', 'internal_id': str(self.internal_id), 'request_id': workflow_context.request_id, 'transform_id': workflow_context.transform_id} - body = json_dumps({'ret': ret, 'key': key, 'internal_id': self.internal_id, 'type': 'iwork', - 'request_id': workflow_context.request_id, - 'transform_id': workflow_context.transform_id}) + body = {'ret': ret, 'key': key, 'internal_id': self.internal_id, 'type': 'iwork', + 'request_id': workflow_context.request_id, + 'transform_id': workflow_context.transform_id} message = {"headers": headers, "body": body} return message @@ -528,21 +540,21 @@ def publish_message(self, ret, key=None): conn = self.connect_to_messaging_broker() workflow_context = self._work_context if workflow_context.workflow_type in [WorkflowType.iWorkflow, WorkflowType.iWorkflowLocal]: - conn.send(body=body, + conn.send(body=json_dumps(body), destination=workflow_context.broker_destination, id='idds-iworkflow_%s' % self.internal_id, ack='auto', headers=headers ) - self.logger.info(f"published header: {headers}, body: {body}") + self.logger.info(f"{self.internal_id} published header: {headers}, body: {body}") elif workflow_context.workflow_type == WorkflowType.iWork: - conn.send(body=body, + conn.send(body=json_dumps(body), destination=workflow_context.broker_destination, id='idds-iwork_%s' % self.internal_id, ack='auto', headers=headers ) - self.logger.info(f"published header: {headers}, body: {body}") + self.logger.info(f"{self.internal_id} published header: {headers}, body: {body}") # self.disconnect() def get_request_id_internal_id(self): @@ -551,20 +563,19 @@ def get_request_id_internal_id(self): if workflow_context.workflow_type in [WorkflowType.iWorkflow, WorkflowType.iWorkflowLocal]: request_id = workflow_context.request_id transform_id = 0 - internal_id = self.internal_id + # internal_id = workflow_context.internal_id elif workflow_context.workflow_type == WorkflowType.iWork: request_id = workflow_context.request_id transform_id = workflow_context.transform_id - internal_id = self.internal_id + # internal_id = workflow_context.internal_id else: request_id = workflow_context.request_id transform_id = 0 - internal_id = self.internal_id + # internal_id = workflow_context.internal_id + internal_id = self.internal_id return request_id, transform_id, internal_id - def publish_through_panda_server(self, message): - request_id, transform_id, internal_id = self.get_request_id_internal_id() - + def publish_through_panda_server(self, request_id, transform_id, internal_id, message): import idds.common.utils as idds_utils import pandaclient.idds_api as idds_api idds_server = self._work_context.get_idds_server() @@ -574,21 +585,19 @@ def publish_through_panda_server(self, message): compress=True, manager=True) status, ret = client.send_messages(request_id=request_id, transform_id=transform_id, internal_id=internal_id, msgs=[message]) - if status: - self.logger.info(f"published message through panda server: {message}") + if status == 0 and type(ret) in (list, tuple) and len(ret) > 1 and ret[0] is True and type(ret[1]) in (list, tuple) and ret[1][0] is True: + self.logger.info(f"{self.internal_id} published message through panda server: {message}") else: - self.logger.error(f"failed to publish message through panda server, status: {status}, ret: {ret}") - - def publish_through_idds_server(self, message): - request_id, transform_id, internal_id = self.get_request_id_internal_id() + self.logger.error(f"{self.internal_id} failed to publish message through panda server, status: {status}, ret: {ret}") + def publish_through_idds_server(self, request_id, transform_id, internal_id, message): from idds.client.clientmanager import ClientManager client = ClientManager(host=self._work_context.get_idds_server()) status, ret = client.send_messages(request_id=request_id, transform_id=transform_id, internal_id=internal_id, msgs=[message]) if status: - self.logger.info(f"published message through idds server: {message}") + self.logger.info(f"{self.internal_id} published message through idds server: {message}") else: - self.logger.error(f"failed to publish message through idds server, status: {status}, ret: {ret}") + self.logger.error(f"{self.internal_id} failed to publish message through idds server, status: {status}, ret: {ret}") def publish_through_api(self, ret, key=None, force=False): message = self.get_message(ret=ret, key=key) @@ -602,33 +611,33 @@ def publish_through_api(self, ret, key=None, force=False): if force: request_id = 0 else: - self.logger.warn("Not to publish message through API since the request id is None") + self.logger.warn(f"{self.internal_id} Not to publish message through API since the request id is None") return if self._work_context.service == 'panda': - self.publish_through_panda_server(message) + self.publish_through_panda_server(request_id, transform_id, internal_id, message) else: - self.publish_through_idds_server(message) + self.publish_through_idds_server(request_id, transform_id, internal_id, message) except Exception as ex: - self.logger.error(f"Failed to publish message through API: {ex}") + self.logger.error(f"{self.internal_id} Failed to publish message through API: {ex}") def publish(self, ret, key=None, force=False): stomp_failed = False if with_stomp and self._broker_initialized: try: - self.logger.info("publishing results through messaging brokers") + self.logger.info(f"{self.internal_id} publishing results through messaging brokers") self.publish_message(ret=ret, key=key) except Exception as ex: - self.logger.warn(f"Failed to publish result through messaging brokers: {ex}") + self.logger.warn(f"{self.internal_id} Failed to publish result through messaging brokers: {ex}") stomp_failed = True if not with_stomp or not self._broker_initialized or stomp_failed: - self.logger.info("publishing results through http API") + self.logger.info(f"{self.internal_id} publishing results through http API") self.publish_through_api(ret=ret, key=key, force=force) def poll_messages_through_panda_server(self, request_id, transform_id, internal_id): if request_id is None: - self.logger.warn("Not to poll message through panda server, since the request_id is None") + self.logger.warn(f"{self.internal_id} Not to poll message through panda server, since the request_id is None") return [] import idds.common.utils as idds_utils @@ -639,27 +648,30 @@ def poll_messages_through_panda_server(self, request_id, transform_id, internal_ idds_host=idds_server, compress=True, manager=True) - status, messages = client.get_messages(request_id=request_id, transform_id=transform_id, internal_id=internal_id) - if status: - self.logger.info(f"poll message through panda server, number of messages: {len(messages)}") + status, ret = client.get_messages(request_id=request_id, transform_id=transform_id, internal_id=internal_id) + if status == 0 and type(ret) in (list, tuple) and len(ret) > 1 and ret[0] is True and type(ret[1]) in (list, tuple) and ret[1][0] is True: + self.logger.info(f"{self.internal_id} poll message through panda server, ret: {ret}") + messages = ret[1][1] + self.logger.info(f"{self.internal_id} poll message through panda server, number of messages: {len(messages)}") return messages else: - self.logger.error(f"failed to poll messages through panda server, error: {messages}") + self.logger.error(f"{self.internal_id} failed to poll messages through panda server, status: {status}, ret: {ret}") return [] def poll_messages_through_idds_server(self, request_id, transform_id, internal_id): if request_id is None: - self.logger.warn("Not to poll message through idds server, since the request_id is None") + self.logger.warn(f"{self.internal_id} Not to poll message through idds server, since the request_id is None") return [] from idds.client.clientmanager import ClientManager client = ClientManager(host=self._work_context.get_idds_server()) status, messages = client.get_messages(request_id=request_id, transform_id=transform_id, internal_id=internal_id) if status: - self.logger.info(f"poll message through idds server, number of messages: {len(messages)}") + self.logger.info(f"{self.internal_id} poll message through panda server, ret: {messages}") + self.logger.info(f"{self.internal_id} poll message through idds server, number of messages: {len(messages)}") return messages else: - self.logger.error(f"failed to poll messages through idds server, error: {messages}") + self.logger.error(f"{self.internal_id} failed to poll messages through idds server, error: {messages}") return [] def poll_messages(self, force=False): @@ -669,7 +681,7 @@ def poll_messages(self, force=False): if force: request_id = 0 else: - self.logger.warn("Not to poll message, since the request_id is None") + self.logger.warn(f"{self.internal_id} Not to poll message, since the request_id is None") return if self._work_context.service == 'panda': @@ -678,22 +690,25 @@ def poll_messages(self, force=False): messages = self.poll_messages_through_idds_server(request_id=request_id, transform_id=transform_id, internal_id=internal_id) for message in messages: - self._queue.put(message) + body = message['body'] + self._queue.put(body) except Exception as ex: - self.logger.error(f"Failed to poll message: {ex}") + self.logger.error(f"{self.internal_id} Failed to poll message: {ex}") def run_subscriber(self, force=False): try: - self.logger.info("run subscriber") + self.logger.info("{self.internal_id} run subscriber") if with_stomp and self._broker_initialized and self._num_stomp_failures < self._max_stomp_failures: try: self.subscribe_to_messaging_brokers(force=True) except Exception as ex: - self.logger.warn(f"run subscriber fails to subscribe to message broker: {ex}") + self.logger.warn(f"{self.internal_id} run subscriber fails to subscribe to message broker: {ex}") self._num_stomp_failures += 1 self._is_messaging_ok = False self._broker_initialized = False + time_poll = None + time_start = time.time() while self._graceful_stop and not self._graceful_stop.is_set(): if with_stomp and self._broker_initialized and self._is_messaging_ok and self._num_stomp_failures < self._max_stomp_failures: has_failed_conns = False @@ -704,14 +719,31 @@ def run_subscriber(self, force=False): self.subscribe_to_messaging_brokers(force=True) time.sleep(1) else: - self.poll_messages(force=force) - time.sleep(self._poll_period) + if self._timeout: + sleep_time = min(self._timeout / 3, self._poll_period) + else: + sleep_time = self._poll_period + + if time_poll is None or time.time() - time_poll > sleep_time: + self.poll_messages(force=force) + time_poll = time.time() + time.sleep(1) + + if self._timeout and time.time() - time_start > self._timeout: + self.logger.info(f"{self.internal_id} timeout reached") + break + + if self._graceful_stop and self._graceful_stop.is_set(): + self.logger.info(f"{self.internal_id} graceful stop is set") self.poll_messages(force=force) + self._is_stop = True self.stop() + self.logger.info(f"{self.internal_id} subscriber finished.") except Exception as ex: - self.logger.error("run subscriber failed with error: %s" % str(ex)) + self.logger.error(f"{self.internal_id} run subscriber failed with error: {ex}") self.logger.error(traceback.format_exc()) + self._is_stop = True self.stop() def get_results(self, nologs=True): @@ -719,12 +751,12 @@ def get_results(self, nologs=True): self._nologs = nologs rets = self.results if not self._nologs: - self.logger.debug('percent %s, results: %s' % (self.get_results_percentage(), str(rets))) + self.logger.debug(f'{self.internal_id} percent {self.get_results_percentage()}, results: {rets}') percent = self.get_results_percentage() if percent >= self._wait_percent: self.stop() - self.logger.info("Got results: %s (number of wrong keys: %s)" % (percent, self._num_wrong_keys)) + self.logger.info(f"{self.internal_id} Got results: {percent} (number of wrong keys: {self._num_wrong_keys})") self._nologs = old_nologs return rets @@ -738,12 +770,15 @@ def subscribe(self, force=False): thread.start() time.sleep(1) self._subscribed = True + self._is_stop = False def stop(self): if self._graceful_stop: self._graceful_stop.set() self.disconnect() self._subscribed = False + while not self._is_stop: + time.sleep(1) def __del__(self): self.stop() @@ -755,40 +790,44 @@ def wait_results(self, timeout=None, force_return_results=False): time_log = time.time() time_start = time.time() if timeout is None: - self.logger.info("waiting for results") + self.logger.info(f"{self.internal_id} waiting for results") try: while not get_results and self._graceful_stop and not self._graceful_stop.is_set(): self.get_results(nologs=True) percent = self.get_results_percentage() if time.time() - time_log > 600: # 10 minutes - self.logger.info("waiting for results: %s (number of wrong keys: %s)" % (percent, self._num_wrong_keys)) + self.logger.info(f"{self.internal_id} waiting for results: {percent} (number of wrong keys: {self._num_wrong_keys})") time_log = time.time() time.sleep(1) if self.is_all_results_available: get_results = True self.waiting_result_terminated = True - self.logger.info("Got result percentage %s is not smaller then wait_percent %s, set waiting_result_terminated to True" % (percent, self._wait_percent)) + self.logger.info(f"{self.internal_id} Got result percentage {percent} is not smaller then wait_percent {self._wait_percent}, set waiting_result_terminated to True") if self._timeout is not None and self._timeout > 0 and time.time() - time_start > self._timeout: # global timeout - self.logger.info("Waiting result timeout(%s seconds), set waiting_result_terminated to True" % self._timeout) + self.logger.info(f"{self.internal_id} Waiting result timeout({self._timeout} seconds), set waiting_result_terminated to True") get_results = True self.waiting_result_terminated = True if timeout is not None and timeout > 0 and time.time() - time_start > timeout: # local timeout + self.logger.info(f"{self.internal_id} timeout reached") break percent = self.get_results_percentage() if timeout is None or time.time() - time_start > 600: - self.logger.info("Got results: %s (number of wrong keys: %s)" % (percent, self._num_wrong_keys)) + self.logger.info(f"{self.internal_id} Got results: {percent} (number of wrong keys: {self._num_wrong_keys})") except Exception as ex: - self.logger.error("Wait_results got some exception: %s" % str(ex)) + self.logger.error(f"Wait_results got some exception: {ex}") self.logger.error(traceback.format_exc()) self._graceful_stop.set() if get_results or self._graceful_stop.is_set() or self.is_all_results_available or force_return_results: # stop the subscriber self._graceful_stop.set() - self.logger.info("Got results: %s (number of wrong keys: %s)" % (percent, self._num_wrong_keys)) + # wait the subscriber to finish + time.sleep(2) + percent = self.get_results_percentage() + self.logger.info(f"{self.internal_id} Got results: {percent} (number of wrong keys: {self._num_wrong_keys})") results = self.results return results @@ -805,13 +844,13 @@ def is_ok(self): test_id = str(uuid.uuid4()) self.publish(test_id, force=True) ret = self.wait_result(force_return_results=True) - self.logger.info(f"AsyncResult: publish: {test_id}, received: {ret}") + self.logger.info(f"{self.internal_id} AsyncResult: publish: {test_id}, received: {ret}") if test_id == ret: - self.logger.info("AsyncResult is ok") + self.logger.info(f"{self.internal_id} AsyncResult is ok") return True else: - self.logger.info("AsyncResult is not ok") + self.logger.info(f"{self.internal_id} AsyncResult is not ok") return False except Exception as ex: - self.logger.error(f"AsyncResult is not ok: {ex}") + self.logger.error(f"{self.internal_id} AsyncResult is not ok: {ex}") return False diff --git a/workflow/lib/idds/iworkflow/work.py b/workflow/lib/idds/iworkflow/work.py index 49d2471e..4c14a46d 100644 --- a/workflow/lib/idds/iworkflow/work.py +++ b/workflow/lib/idds/iworkflow/work.py @@ -711,7 +711,7 @@ def get_status_from_panda_server(self): request_id = self._context.request_id transform_id = self._context.transform_id if not transform_id: - log_msg = "No transform id defined (request_id: %s, transform_id: %s)", (request_id, transform_id) + log_msg = f"No transform id defined (request_id: {request_id}, transform_id: {transform_id}, internal_id: {self.internal_id})" logging.error(log_msg) return exceptions.IDDSException(log_msg) @@ -724,17 +724,17 @@ def get_status_from_panda_server(self): try: tf = json_loads(tf) except Exception as ex: - logging.warn("Failed to json loads transform(%s): %s" % (tf, ex)) + logging.warn(f"Failed to json loads transform({tf}): {ex}") else: tf = None - logging.error("Failed to get transform (request_id: %s, transform_id: %s) status from PanDA-iDDS: %s" % (request_id, transform_id, ret)) + logging.error(f"Failed to get transform (request_id: {request_id}, transform_id: {transform_id}, internal_id: {self.internal_id}) status from PanDA-iDDS: {ret}") return TransformStatus.Transforming if not tf: - logging.info("Get transform (request_id: %s, transform_id: %s) from PanDA-iDDS: %s" % (request_id, transform_id, tf)) + logging.info(f"Get transform (request_id: {request_id}, transform_id: {transform_id}, internal_id: {self.internal_id}) from PanDA-iDDS: {tf}") return None - logging.info("Get transform status (request_id: %s, transform_id: %s) from PanDA-iDDS: %s" % (request_id, transform_id, tf['status'])) + logging.info(f"Get transform status (request_id: {request_id}, transform_id: {transform_id}, internal_id: {self.internal_id}) from PanDA-iDDS: {tf['status']}") return tf['status'] @@ -745,17 +745,16 @@ def get_status_from_idds_server(self): request_id = self._context.request_id transform_id = self._context.transform_id if not transform_id: - log_msg = "No transform id defined (request_id: %s, transform_id: %s)" % (request_id, transform_id) + log_msg = f"No transform id defined (request_id: {request_id}, transform_id: {transform_id}, internal_id: {self.internal_id})" logging.error(log_msg) return exceptions.IDDSException(log_msg) tf = client.get_transform(request_id=request_id, transform_id=transform_id) if not tf: - logging.info("Get transform (request_id: %s, transform_id: %s) from iDDS: %s" % (request_id, transform_id, tf)) + logging.info(f"Get transform (request_id: {request_id}, transform_id: {transform_id}, internal_id: {self.internal_id}) from iDDS: {tf}") return None - logging.info("Get transform status (request_id: %s, transform_id: %s) from iDDS: %s" % (request_id, transform_id, tf['status'])) - + logging.info(f"Get transform status (request_id: {request_id}, transform_id: {transform_id}, internal_id: {self.internal_id}) from iDDS: {tf['status']}") return tf['status'] def get_status(self): @@ -1077,7 +1076,8 @@ def get_runner(self): clean_env = self.get_clean_env() if clean_env: - cmd = cmd + "; " + clean_env + # cmd = cmd + "; " + clean_env + cmd = cmd + "; ret=$?; " + clean_env + "; exit $ret" return cmd diff --git a/workflow/lib/idds/iworkflow/workflow.py b/workflow/lib/idds/iworkflow/workflow.py index 09210382..4930870e 100644 --- a/workflow/lib/idds/iworkflow/workflow.py +++ b/workflow/lib/idds/iworkflow/workflow.py @@ -378,11 +378,17 @@ def get_broker_info_from_idds_server(self): :raise Exception when failing to get broker information. """ + logging.info("Getting broker information through idds server.") # iDDS ClientManager from idds.client.clientmanager import ClientManager client = ClientManager(host=self.get_idds_server()) ret = client.get_metainfo(name='asyncresult_config') + if type(ret) in (list, tuple) and ret[0] is True: + return ret[1] + else: + logging.warn(f"Failed to get broker info: {ret}") + return None return ret @@ -392,6 +398,8 @@ def get_broker_info_from_panda_server(self): :raise Exception when failing to get broker information. """ + logging.info("Get broker information through panda server.") + import idds.common.utils as idds_utils import pandaclient.idds_api as idds_api @@ -402,17 +410,22 @@ def get_broker_info_from_panda_server(self): manager=True) ret = client.get_metainfo(name='asyncresult_config') if ret[0] == 0 and ret[1][0]: - meta_info = ret[1][1] - if type(meta_info) in [dict]: - pass - elif type(meta_info) in [str]: - try: - meta_info = json_loads(meta_info) - except Exception as ex: - logging.warn("Failed to json loads meta info(%s): %s" % (meta_info, ex)) + idds_ret = ret[1][1] + if type(idds_ret) in (list, tuple) and idds_ret[0] is True: + meta_info = idds_ret[1] + if type(meta_info) in [dict]: + pass + elif type(meta_info) in [str]: + try: + meta_info = json_loads(meta_info) + except Exception as ex: + logging.warn("Failed to json loads meta info(%s): %s" % (meta_info, ex)) + else: + meta_info = None + logging.warn("Failed to get meta info: %s" % str(ret)) else: meta_info = None - logging.error("Failed to get meta info: %s" % str(ret)) + logging.warn("Failed to get meta info: %s" % str(ret)) return meta_info @@ -1216,7 +1229,7 @@ def get_runner(self): clean_env = self.get_clean_env() if clean_env: - cmd = cmd + "; " + clean_env + cmd = cmd + "; ret=$?; " + clean_env + "; exit $ret" return cmd