From 6e5fe6676a7bdf64b8287f5ab907ba0ae6024953 Mon Sep 17 00:00:00 2001 From: Wen Guan Date: Thu, 28 Mar 2024 16:28:29 +0100 Subject: [PATCH 1/8] add meta info rest and client --- client/lib/idds/client/metainfoclient.py | 49 +++++++++++++++++++++++ main/lib/idds/rest/v1/metainfo.py | 51 ++++++++++++++++++++++++ 2 files changed, 100 insertions(+) create mode 100644 client/lib/idds/client/metainfoclient.py create mode 100644 main/lib/idds/rest/v1/metainfo.py diff --git a/client/lib/idds/client/metainfoclient.py b/client/lib/idds/client/metainfoclient.py new file mode 100644 index 00000000..06f002d1 --- /dev/null +++ b/client/lib/idds/client/metainfoclient.py @@ -0,0 +1,49 @@ +#!/usr/bin/env python +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# You may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0OA +# +# Authors: +# - Wen Guan, , 2024 + + +""" +Metainfo Rest client to access IDDS system. +""" + +import os + +from idds.client.base import BaseRestClient + + +class MetaInfoClient(BaseRestClient): + + """MetaInfo Rest client""" + + METAINFO_BASEURL = 'metainfo' + + def __init__(self, host=None, auth=None, timeout=None): + """ + Constructor of the BaseRestClient. + + :param host: the address of the IDDS server. + :param client_proxy: the client certificate proxy. + :param timeout: timeout in seconds. + """ + super(MetaInfoClient, self).__init__(host=host, auth=auth, timeout=timeout) + + def get_metainfo(self, name): + """ + Get meta info + + :param name: name to select different meta info + + :raise exceptions if it's not got successfully. + """ + path = self.METAINFO_BASEURL + url = self.build_url(self.host, path=os.path.join(path, str(name))) + + meta_info = self.get_request_response(url, type='GET') + return meta_info diff --git a/main/lib/idds/rest/v1/metainfo.py b/main/lib/idds/rest/v1/metainfo.py new file mode 100644 index 00000000..e2ed0294 --- /dev/null +++ b/main/lib/idds/rest/v1/metainfo.py @@ -0,0 +1,51 @@ +#!/usr/bin/env python +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# You may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0OA +# +# Authors: +# - Wen Guan, , 2024 + + +from traceback import format_exc + +from flask import Blueprint + +from idds.common import exceptions +from idds.common.constants import HTTP_STATUS_CODE +from idds.common.utils import get_asyncresult_config + +from idds.rest.v1.controller import IDDSController + + +class MetaInfo(IDDSController): + """ Get Meta info""" + + def get(self, name): + try: + rets = {} + if name == 'asyncresult_config': + asyncresult_config = get_asyncresult_config() + rets = asyncresult_config + + except exceptions.NoObject as error: + return self.generate_http_response(HTTP_STATUS_CODE.NotFound, exc_cls=error.__class__.__name__, exc_msg=error) + except exceptions.IDDSException as error: + return self.generate_http_response(HTTP_STATUS_CODE.InternalError, exc_cls=error.__class__.__name__, exc_msg=error) + except Exception as error: + print(error) + print(format_exc()) + return self.generate_http_response(HTTP_STATUS_CODE.InternalError, exc_cls=exceptions.CoreException.__name__, exc_msg=error) + + return self.generate_http_response(HTTP_STATUS_CODE.OK, data=rets) + + +def get_blueprint(): + bp = Blueprint('metainfo', __name__) + + metainfo_view = MetaInfo.as_view('metainfo') + bp.add_url_rule('/metainfo/', view_func=metainfo_view, methods=['get', ]) + + return bp From 5e1df17e9c68d6f3821806d1288d022e67e94ce2 Mon Sep 17 00:00:00 2001 From: Wen Guan Date: Thu, 28 Mar 2024 16:29:46 +0100 Subject: [PATCH 2/8] add close request function --- client/lib/idds/client/client.py | 3 +- client/lib/idds/client/clientmanager.py | 43 +++++++++++++++++++++++-- client/lib/idds/client/requestclient.py | 21 ++++++++++++ common/lib/idds/common/constants.py | 4 +++ common/lib/idds/common/event.py | 15 +++++++++ main/lib/idds/rest/v1/transforms.py | 2 +- 6 files changed, 84 insertions(+), 4 deletions(-) diff --git a/client/lib/idds/client/client.py b/client/lib/idds/client/client.py index 04294a3d..bac14aa8 100644 --- a/client/lib/idds/client/client.py +++ b/client/lib/idds/client/client.py @@ -28,12 +28,13 @@ from idds.client.messageclient import MessageClient from idds.client.pingclient import PingClient from idds.client.authclient import AuthClient +from idds.client.metainfoclient import MetaInfoClient warnings.filterwarnings("ignore") -class Client(RequestClient, TransformClient, CatalogClient, CacherClient, HPOClient, LogsClient, MessageClient, PingClient, AuthClient): +class Client(RequestClient, TransformClient, CatalogClient, CacherClient, HPOClient, LogsClient, MessageClient, PingClient, AuthClient, MetaInfoClient): """Main client class for IDDS rest callings.""" diff --git a/client/lib/idds/client/clientmanager.py b/client/lib/idds/client/clientmanager.py index ca8f13b6..54384c35 100644 --- a/client/lib/idds/client/clientmanager.py +++ b/client/lib/idds/client/clientmanager.py @@ -6,7 +6,7 @@ # http://www.apache.org/licenses/LICENSE-2.0OA # # Authors: -# - Wen Guan, , 2020 - 2023 +# - Wen Guan, , 2020 - 2024 """ @@ -32,7 +32,7 @@ from idds.common.authentication import OIDCAuthentication, OIDCAuthenticationUtils -from idds.common.utils import setup_logging, get_proxy_path +from idds.common.utils import setup_logging, get_proxy_path, idds_mask from idds.client.version import release_version from idds.client.client import Client @@ -444,6 +444,13 @@ def submit(self, workflow, username=None, userdn=None, use_dataset_name=False): priority = workflow.priority if priority is None: priority = 0 + elif workflow.type in [WorkflowType.iWorkflowLocal]: + scope = 'iworkflowLocal' + request_type = RequestType.iWorkflowLocal + transform_tag = workflow.get_work_tag() + priority = workflow.priority + if priority is None: + priority = 0 except Exception: pass @@ -621,6 +628,24 @@ def abort_task(self, request_id=None, workload_id=None, task_id=None): ret = self.client.abort_request_task(request_id=request_id, workload_id=workload_id, task_id=task_id) return ret + @exception_handler + def close(self, request_id=None, workload_id=None): + """ + Close requests. + + :param workload_id: the workload id. + :param request_id: the request. + """ + self.setup_client() + + if request_id is None and workload_id is None: + logging.error("Both request_id and workload_id are None. One of them should not be None") + return (-1, "Both request_id and workload_id are None. One of them should not be None") + + ret = self.client.close_request(request_id=request_id, workload_id=workload_id) + # return (-1, 'No matching requests') + return ret + @exception_handler def retry(self, request_id=None, workload_id=None): """ @@ -840,3 +865,17 @@ def update_build_request(self, request_id, signature, workflow): self.setup_client() return self.client.update_build_request(request_id=request_id, signature=signature, workflow=workflow) + + @exception_handler + def get_metainfo(self, name): + """ + Get meta info. + + :param name: the name of the meta info. + """ + self.setup_client() + + logging.info("Retrieving meta info for %s" % (name)) + ret = self.client.get_metainfo(name=name) + logging.info("Retrieved meta info for %s: %s" % (name, idds_mask(ret))) + return ret diff --git a/client/lib/idds/client/requestclient.py b/client/lib/idds/client/requestclient.py index 8a51185c..1dce2724 100644 --- a/client/lib/idds/client/requestclient.py +++ b/client/lib/idds/client/requestclient.py @@ -161,6 +161,27 @@ def abort_request(self, request_id, workload_id=None): r = self.get_request_response(url, type='PUT', data=None) return r + def close_request(self, request_id, workload_id=None): + """ + Close Request. + + :param request_id: the request. + :param kwargs: other attributes of the request. + + :raise exceptions if it's not updated successfully. + """ + path = self.REQUEST_BASEURL + path += "/close" + + if request_id is None: + request_id = 'null' + if workload_id is None: + workload_id = 'null' + + url = self.build_url(self.host, path=os.path.join(path, str(request_id), str(workload_id))) + r = self.get_request_response(url, type='PUT', data=None) + return r + def abort_request_task(self, request_id, workload_id=None, task_id=None): """ Abort Request task. diff --git a/common/lib/idds/common/constants.py b/common/lib/idds/common/constants.py index 1d527d19..578cdf71 100644 --- a/common/lib/idds/common/constants.py +++ b/common/lib/idds/common/constants.py @@ -101,6 +101,7 @@ class WorkflowType(IDDSEnum): Workflow = 0 iWorkflow = 1 iWork = 2 + iWorkflowLocal = 3 class WorkStatus(IDDSEnum): @@ -153,6 +154,7 @@ class RequestStatus(IDDSEnum): Building = 21 Built = 22 Throttling = 23 + ToClose = 24 class RequestLocking(IDDSEnum): @@ -196,6 +198,7 @@ class RequestType(IDDSEnum): HyperParameterOpt = 4 Derivation = 5 iWorkflow = 6 + iWorkflowLocal = 7 Other = 99 @@ -489,6 +492,7 @@ class CommandType(IDDSEnum): AbortRequest = 0 ResumeRequest = 1 ExpireRequest = 2 + CloseRequest = 3 class CommandStatus(IDDSEnum): diff --git a/common/lib/idds/common/event.py b/common/lib/idds/common/event.py index 58cf167c..8446df78 100644 --- a/common/lib/idds/common/event.py +++ b/common/lib/idds/common/event.py @@ -36,6 +36,7 @@ class EventType(IDDSEnum): AbortRequest = 12 ResumeRequest = 13 ExpireRequest = 14 + CloseRequest = 15 NewTransform = 20 UpdateTransform = 21 @@ -242,6 +243,20 @@ def to_json(self, strip=False): return ret +class CloseRequestEvent(Event): + def __init__(self, publisher_id=None, request_id=None, content=None, counter=1): + super(CloseRequestEvent, self).__init__(publisher_id, event_type=EventType.CloseRequest, content=content, counter=counter) + self._request_id = request_id + + def get_event_id(self): + return self._request_id + + def to_json(self, strip=False): + ret = super(CloseRequestEvent, self).to_json() + ret['request_id'] = self._request_id + return ret + + class ResumeRequestEvent(Event): def __init__(self, publisher_id=None, request_id=None, content=None, counter=1): super(ResumeRequestEvent, self).__init__(publisher_id, event_type=EventType.ResumeRequest, content=content, counter=counter) diff --git a/main/lib/idds/rest/v1/transforms.py b/main/lib/idds/rest/v1/transforms.py index 210d2983..72455e23 100644 --- a/main/lib/idds/rest/v1/transforms.py +++ b/main/lib/idds/rest/v1/transforms.py @@ -56,7 +56,7 @@ def post(self, request_id): if not req: raise exceptions.IDDSException("Request %s is not found" % request_id) - if req['request_type'] != RequestType.iWorkflow: + if req['request_type'] not in [RequestType.iWorkflow, RequestType.iWorkflowLocal]: raise exceptions.IDDSException("Request type %s doesn't support this operations" % req['request_type']) workflow = req['request_metadata']['workflow'] From 5c09c5b2264e5fc708e25cfc28b8e0ed0c7e0759 Mon Sep 17 00:00:00 2001 From: Wen Guan Date: Thu, 28 Mar 2024 16:30:13 +0100 Subject: [PATCH 3/8] add close request function --- main/lib/idds/rest/v1/app.py | 2 ++ main/lib/idds/rest/v1/requests.py | 55 +++++++++++++++++++++++++++++++ 2 files changed, 57 insertions(+) diff --git a/main/lib/idds/rest/v1/app.py b/main/lib/idds/rest/v1/app.py index 0b97f582..08936792 100644 --- a/main/lib/idds/rest/v1/app.py +++ b/main/lib/idds/rest/v1/app.py @@ -34,6 +34,7 @@ from idds.rest.v1 import messages from idds.rest.v1 import ping from idds.rest.v1 import auth +from idds.rest.v1 import metainfo class LoggingMiddleware(object): @@ -70,6 +71,7 @@ def get_normal_blueprints(): # bps.append(monitor.get_blueprint()) bps.append(messages.get_blueprint()) bps.append(ping.get_blueprint()) + bps.append(metainfo.get_blueprint()) return bps diff --git a/main/lib/idds/rest/v1/requests.py b/main/lib/idds/rest/v1/requests.py index fdf2651e..651d5caa 100644 --- a/main/lib/idds/rest/v1/requests.py +++ b/main/lib/idds/rest/v1/requests.py @@ -362,6 +362,58 @@ def put(self, request_id, workload_id=None, task_id=None): return self.generate_http_response(HTTP_STATUS_CODE.OK, data=[(0, {'status': 0, 'message': 'Command registered successfully'})]) +class RequestClose(IDDSController): + """ Clsoe Request. """ + + def put(self, request_id, workload_id=None): + """ Close the request. + HTTP Success: + 200 OK + HTTP Error: + 400 Bad request + 404 Not Found + 500 Internal Error + """ + if request_id == 'null': + request_id = None + if workload_id == 'null': + workload_id = None + + try: + username = self.get_username() + reqs = get_requests(request_id=request_id, workload_id=workload_id, with_request=True) + + if not reqs: + return self.generate_http_response(HTTP_STATUS_CODE.OK, data=[(-1, {'status': -1, 'message': 'No match requests'})]) + + for req in reqs: + if req['username'] and req['username'] != username and not authenticate_is_super_user(username): + msg = "User %s has no permission to update request %s(user: %s)" % (username, req['request_id'], req['username']) + # raise exceptions.AuthenticationNoPermission(msg) + return self.generate_http_response(HTTP_STATUS_CODE.OK, data=[(-1, {'status': -1, 'message': msg})]) + except exceptions.AuthenticationNoPermission as error: + return self.generate_http_response(HTTP_STATUS_CODE.InternalError, exc_cls=error.__class__.__name__, exc_msg=error) + except Exception as error: + print(error) + print(format_exc()) + return self.generate_http_response(HTTP_STATUS_CODE.InternalError, exc_cls=exceptions.CoreException.__name__, exc_msg=error) + + try: + add_command(request_id=request_id, cmd_type=CommandType.CloseRequest, + workload_id=workload_id, cmd_content=None, username=username) + + except exceptions.NoObject as error: + return self.generate_http_response(HTTP_STATUS_CODE.NotFound, exc_cls=error.__class__.__name__, exc_msg=error) + except exceptions.IDDSException as error: + return self.generate_http_response(HTTP_STATUS_CODE.InternalError, exc_cls=error.__class__.__name__, exc_msg=error) + except Exception as error: + print(error) + print(format_exc()) + return self.generate_http_response(HTTP_STATUS_CODE.InternalError, exc_cls=exceptions.CoreException.__name__, exc_msg=error) + + return self.generate_http_response(HTTP_STATUS_CODE.OK, data=[(0, {'status': 0, 'message': 'Command registered successfully'})]) + + class RequestRetry(IDDSController): """ Retry Request. """ @@ -441,6 +493,9 @@ def get_blueprint(): bp.add_url_rule('/request/abort//', view_func=request_abort, methods=['put', ]) bp.add_url_rule('/request/abort///task_id', view_func=request_abort, methods=['put', ]) + request_close = RequestClose.as_view('request_close') + bp.add_url_rule('/request/close//', view_func=request_close, methods=['put', ]) + request_retry = RequestRetry.as_view('request_retry') bp.add_url_rule('/request/retry//', view_func=request_retry, methods=['put', ]) From c1745fe4e8754937f731f1b596cedf52f235a0d3 Mon Sep 17 00:00:00 2001 From: Wen Guan Date: Thu, 28 Mar 2024 16:31:00 +0100 Subject: [PATCH 4/8] add agent function to close request --- main/lib/idds/agents/clerk/clerk.py | 169 +++++++++++++++++++++++++--- 1 file changed, 156 insertions(+), 13 deletions(-) diff --git a/main/lib/idds/agents/clerk/clerk.py b/main/lib/idds/agents/clerk/clerk.py index 213cbf7c..9b42cea0 100644 --- a/main/lib/idds/agents/clerk/clerk.py +++ b/main/lib/idds/agents/clerk/clerk.py @@ -32,6 +32,7 @@ NewRequestEvent, UpdateRequestEvent, AbortRequestEvent, + CloseRequestEvent, ResumeRequestEvent, NewTransformEvent, UpdateTransformEvent, @@ -244,7 +245,7 @@ def get_running_requests(self): RequestStatus.ToExpire, RequestStatus.Expiring, RequestStatus.ToFinish, RequestStatus.ToForceFinish, RequestStatus.ToResume, RequestStatus.Resuming, - RequestStatus.Building] + RequestStatus.Building, RequestStatus.ToClose] reqs = core_requests.get_requests_by_status_type(status=req_status, time_period=None, min_request_id=min_request_id, locking=True, bulk_size=self.retrieve_bulk_size, @@ -320,6 +321,8 @@ def get_operation_requests(self): event = AbortRequestEvent(publisher_id=self.id, request_id=request_id, content=event_content) elif cmd_type in [CommandType.ResumeRequest]: event = ResumeRequestEvent(publisher_id=self.id, request_id=request_id, content=event_content) + elif cmd_type in [CommandType.CloseRequest]: + event = CloseRequestEvent(publisher_id=self.id, request_id=request_id, content=event_content) # elif cmd_status in [CommandStatus.Processing]: # event = UpdateRequestEvent(publisher_id=self.id, request_id=request_id, content=event_content) @@ -437,7 +440,10 @@ def generate_transform(self, req, work, build=False, iworkflow=False): transform_type = TransformType.Workflow try: work_type = work.get_work_type() - if work_type in [WorkflowType.iWorkflow]: + if work_type in [WorkflowType.iWorkflowLocal]: + # no need to generate transform + return None + elif work_type in [WorkflowType.iWorkflow]: transform_type = TransformType.iWorkflow elif work_type in [WorkflowType.iWork]: transform_type = TransformType.iWork @@ -689,7 +695,8 @@ def handle_new_request(self, req): # new_work.create_processing() transform = self.generate_transform(req, work) - transforms.append(transform) + if transform: + transforms.append(transform) self.logger.debug(log_pre + "Processing request(%s): new transforms: %s" % (req['request_id'], str(transforms))) # processing_metadata = req['processing_metadata'] @@ -745,7 +752,8 @@ def handle_new_irequest(self, req): transforms = [] transform = self.generate_transform(req, workflow) - transforms.append(transform) + if transform: + transforms.append(transform) self.logger.debug(log_pre + "Processing request(%s): new transforms: %s" % (req['request_id'], str(transforms))) ret_req = {'request_id': req['request_id'], @@ -923,7 +931,7 @@ def process_new_request(self, event): log_pre = self.get_log_prefix(req) if self.has_to_build_work(req): ret = self.handle_build_request(req) - elif req['request_type'] in [RequestType.iWorkflow]: + elif req['request_type'] in [RequestType.iWorkflow, RequestType.iWorkflowLocal]: ret = self.handle_new_irequest(req) else: ret = self.handle_new_request(req) @@ -1185,13 +1193,18 @@ def handle_update_irequest_real(self, req, event): failed_tfs += 1 req_status = RequestStatus.Transforming - if total_tfs == finished_tfs: - req_status = RequestStatus.Finished - elif total_tfs == finished_tfs + subfinished_tfs + failed_tfs: - if finished_tfs + subfinished_tfs > 0: - req_status = RequestStatus.SubFinished - else: - req_status = RequestStatus.Failed + if req['request_type'] in [RequestType.iWorkflowLocal] and total_tfs == 0: + workflow = req['request_metadata'].get('workflow', None) + if workflow and req['created_at'] + datetime.timedelta(seconds=workflow.max_walltime) < datetime.datetime.utcnow(): + req_status = RequestStatus.Finished + else: + if total_tfs == finished_tfs: + req_status = RequestStatus.Finished + elif total_tfs == finished_tfs + subfinished_tfs + failed_tfs: + if finished_tfs + subfinished_tfs > 0: + req_status = RequestStatus.SubFinished + else: + req_status = RequestStatus.Failed log_msg = log_pre + "ireqeust %s status: %s" % (req['request_id'], req_status) log_msg = log_msg + "(transforms: total %s, finished: %s, subfinished: %s, failed %s)" % (total_tfs, finished_tfs, subfinished_tfs, failed_tfs) @@ -1269,7 +1282,7 @@ def process_update_request(self, event): pro_ret = ReturnCode.Locked.value else: log_pre = self.get_log_prefix(req) - if req['request_type'] in [RequestType.iWorkflow]: + if req['request_type'] in [RequestType.iWorkflow, RequestType.iWorkflowLocal]: ret = self.handle_update_irequest(req, event=event) else: ret = self.handle_update_request(req, event=event) @@ -1373,6 +1386,14 @@ def process_abort_request(self, event): self.logger.info(log_pre + "process_abort_request result: %s" % str(ret)) self.update_request(ret) self.handle_command(event, cmd_status=CommandStatus.Failed, errors="Request is already terminated. Cannot be aborted") + elif req['request_type'] in [RequestType.iWorkflow, RequestType.iWorkflowLocal]: + # todo + ret = {'request_id': req['request_id'], + 'parameters': {'locking': RequestLocking.Idle, + 'status': req['status']} + } + self.update_request(ret) + self.handle_command(event, cmd_status=CommandStatus.Failed, errors="Not support abortion for iWorkflow") else: ret = self.handle_abort_request(req, event) self.logger.info(log_pre + "process_abort_request result: %s" % str(ret)) @@ -1425,6 +1446,116 @@ def process_abort_request(self, event): self.number_workers -= 1 return pro_ret + def handle_close_irequest(self, req, event): + """ + process close irequest + """ + try: + log_pre = self.get_log_prefix(req) + self.logger.info(log_pre + "handle_close_irequest event: %s" % str(event)) + + tfs = core_transforms.get_transforms(request_id=req['request_id']) + total_tfs, finished_tfs, subfinished_tfs, failed_tfs = 0, 0, 0, 0 + for tf in tfs: + total_tfs += 1 + if tf['status'] in [TransformStatus.Finished, TransformStatus.Built]: + finished_tfs += 1 + elif tf['status'] in [TransformStatus.SubFinished]: + subfinished_tfs += 1 + elif tf['status'] in [TransformStatus.Failed, TransformStatus.Cancelled, + TransformStatus.Suspended, TransformStatus.Expired]: + failed_tfs += 1 + else: + event = AbortTransformEvent(publisher_id=self.id, + transform_id=tf['transform_id'], + content=event._content) + self.event_bus.send(event) + + req_status = RequestStatus.Transforming + if req['request_type'] in [RequestType.iWorkflowLocal] and total_tfs == 0: + req_status = RequestStatus.Finished + else: + if total_tfs == finished_tfs: + req_status = RequestStatus.Finished + elif total_tfs == finished_tfs + subfinished_tfs + failed_tfs: + if finished_tfs + subfinished_tfs > 0: + req_status = RequestStatus.SubFinished + else: + req_status = RequestStatus.Failed + + log_msg = log_pre + "ireqeust %s status: %s" % (req['request_id'], req_status) + log_msg = log_msg + "(transforms: total %s, finished: %s, subfinished: %s, failed %s)" % (total_tfs, finished_tfs, subfinished_tfs, failed_tfs) + self.logger.debug(log_msg) + + parameters = {'status': req_status, + 'locking': RequestLocking.Idle, + 'request_metadata': req['request_metadata'] + } + parameters = self.load_poll_period(req, parameters) + + ret = {'request_id': req['request_id'], + 'parameters': parameters} + self.logger.info(log_pre + "Handle close irequest result: %s" % str(ret)) + return ret + + except Exception as ex: + self.logger.error(ex) + self.logger.error(traceback.format_exc()) + error = {'close_err': {'msg': truncate_string('%s' % (ex), length=200)}} + ret_req = {'request_id': req['request_id'], + 'parameters': {'status': RequestStatus.ToClose, + 'locking': RequestLocking.Idle, + 'errors': req['errors'] if req['errors'] else {}}} + ret_req['parameters']['errors'].update(error) + self.logger.info(log_pre + "handle_close_irequest exception result: %s" % str(ret_req)) + return ret_req + + def process_close_request(self, event): + self.number_workers += 1 + pro_ret = ReturnCode.Ok.value + try: + if event: + req = self.get_request(request_id=event._request_id, locking=True) + if not req: + self.logger.error("Cannot find request for event: %s" % str(event)) + pro_ret = ReturnCode.Locked.value + else: + log_pre = self.get_log_prefix(req) + self.logger.info(log_pre + "process_close_request event: %s" % str(event)) + + if req['status'] in [RequestStatus.Finished, RequestStatus.SubFinished, + RequestStatus.Failed, RequestStatus.Cancelled, + RequestStatus.Suspended, RequestStatus.Expired]: + ret = {'request_id': req['request_id'], + 'parameters': {'locking': RequestLocking.Idle, + 'errors': {'extra_msg': "Request is already terminated. Cannot be closed"}}} + if req['errors'] and 'msg' in req['errors']: + ret['parameters']['errors']['msg'] = req['errors']['msg'] + self.logger.info(log_pre + "process_abort_request result: %s" % str(ret)) + self.update_request(ret) + self.handle_command(event, cmd_status=CommandStatus.Failed, errors="Request is already terminated. Cannot be closed") + else: + if req['request_type'] in [RequestType.iWorkflow, RequestType.iWorkflowLocal]: + ret = self.handle_close_irequest(req, event=event) + self.update_request(ret) + else: + pass + # todo + + self.handle_command(event, cmd_status=CommandStatus.Processed, errors=None) + except AssertionError as ex: + self.logger.error("process_close_request, Failed to process event: %s" % str(event)) + self.logger.error(ex) + self.logger.error(traceback.format_exc()) + self.handle_command(event, cmd_status=CommandStatus.Processed, errors=str(ex)) + pro_ret = ReturnCode.Failed.value + except Exception as ex: + self.logger.error(ex) + self.logger.error(traceback.format_exc()) + pro_ret = ReturnCode.Failed.value + self.number_workers -= 1 + return pro_ret + def handle_resume_request(self, req): """ process resume request @@ -1478,6 +1609,14 @@ def process_resume_request(self, event): self.update_request(ret) self.handle_command(event, cmd_status=CommandStatus.Failed, errors="Request is already finished. Cannot be resumed") + elif req['request_type'] in [RequestType.iWorkflow, RequestType.iWorkflowLocal]: + # todo + ret = {'request_id': req['request_id'], + 'parameters': {'locking': RequestLocking.Idle, + 'status': req['status']} + } + self.update_request(ret) + self.handle_command(event, cmd_status=CommandStatus.Failed, errors="Not support to reusme for iWorkflow") else: ret = self.handle_resume_request(req) self.logger.info(log_pre + "process_resume_request result: %s" % str(ret)) @@ -1531,6 +1670,10 @@ def init_event_function_map(self): EventType.ResumeRequest: { 'pre_check': self.is_ok_to_run_more_requests, 'exec_func': self.process_resume_request + }, + EventType.CloseRequest: { + 'pre_check': self.is_ok_to_run_more_requests, + 'exec_func': self.process_close_request } } From 53b3dbda8911e8c08ad870fd815921d191a7202c Mon Sep 17 00:00:00 2001 From: Wen Guan Date: Thu, 28 Mar 2024 16:32:41 +0100 Subject: [PATCH 5/8] move broker info to idds server with meta info; optimize idds workflow/work to support different env --- workflow/lib/idds/iworkflow/asyncresult.py | 19 ++- workflow/lib/idds/iworkflow/work.py | 20 ++- workflow/lib/idds/iworkflow/workflow.py | 171 ++++++++++++++++++--- 3 files changed, 182 insertions(+), 28 deletions(-) diff --git a/workflow/lib/idds/iworkflow/asyncresult.py b/workflow/lib/idds/iworkflow/asyncresult.py index 3e92b09f..c02a422c 100644 --- a/workflow/lib/idds/iworkflow/asyncresult.py +++ b/workflow/lib/idds/iworkflow/asyncresult.py @@ -127,6 +127,7 @@ def __init__(self, work_context, name=None, wait_num=1, wait_keys=[], group_kwar if internal_id: self.internal_id = internal_id self._work_context = work_context + self._work_context.init_brokers() self._name = name self._queue = Queue() @@ -152,6 +153,8 @@ def __init__(self, work_context, name=None, wait_num=1, wait_keys=[], group_kwar self._timeout = timeout + self._nologs = False + @property def logger(self): return logging.getLogger(self.__class__.__name__) @@ -193,8 +196,10 @@ def results(self): if self._bad_results: self.logger.error("Received bad results: %s" % str(self._bad_results)) - self.logger.debug("_results: %s, bad_results: %s" % (str(self._results), str(self._bad_results))) - self.logger.debug("wait_keys: %s, wait_num: %s" % (str(self.wait_keys), self._wait_num)) + if not self._nologs: + self.logger.debug("_results: %s, bad_results: %s" % (str(self._results), str(self._bad_results))) + self.logger.debug("wait_keys: %s, wait_num: %s" % (str(self.wait_keys), self._wait_num)) + rets_dict = {} for result in self._results: key = result['key'] @@ -381,9 +386,13 @@ def run_subscriber(self): self.logger.error("run subscriber failed with error: %s" % str(ex)) self.logger.error(traceback.format_exc()) - def get_results(self): + def get_results(self, nologs=False): + old_nologs = self._nologs + self._nologs = nologs rets = self.results - self.logger.debug('results: %s' % str(rets)) + if not self._nologs: + self.logger.debug('results: %s' % str(rets)) + self._nologs = old_nologs return rets def get_results_percentage(self): @@ -415,7 +424,7 @@ def wait_results(self, timeout=None, force_return_results=False): self.logger.info("waiting for results") try: while not get_results and not self._graceful_stop.is_set(): - self.get_results() + self.get_results(nologs=True) percent = self.get_results_percentage() if time.time() - time_log > 600: # 10 minutes self.logger.info("waiting for results: %s (number of wrong keys: %s)" % (percent, self._num_wrong_keys)) diff --git a/workflow/lib/idds/iworkflow/work.py b/workflow/lib/idds/iworkflow/work.py index 38e343ca..0ae3e21b 100644 --- a/workflow/lib/idds/iworkflow/work.py +++ b/workflow/lib/idds/iworkflow/work.py @@ -287,6 +287,9 @@ def map_results(self, value): def get_idds_server(self): return self._workflow_context.get_idds_server() + def init_brokers(self): + self._workflow_context.init_brokers() + def initialize(self): return self._workflow_context.initialize() @@ -300,7 +303,20 @@ def setup(self): """ :returns command: `str` to setup the workflow. """ - return self._workflow_context.setup() + if not self.init_env: + return self._workflow_context.setup() + + global_set_up = self._workflow_context.global_setup() + init_env = self.init_env + ret = None + if global_set_up: + ret = global_set_up + if init_env: + if ret: + ret = ret + "; " + init_env + else: + ret = init_env + return ret class Work(Base): @@ -536,7 +552,7 @@ def group_parameters(self, value): raise Exception("Not allwed to update group parameters") def get_work_tag(self): - return 'iWork' + return WorkflowType.iWork.name def get_work_type(self): return WorkflowType.iWork diff --git a/workflow/lib/idds/iworkflow/workflow.py b/workflow/lib/idds/iworkflow/workflow.py index 7f6d2358..c92664e1 100644 --- a/workflow/lib/idds/iworkflow/workflow.py +++ b/workflow/lib/idds/iworkflow/workflow.py @@ -51,11 +51,11 @@ def get_current_workflow(cls): class WorkflowContext(Context): - def __init__(self, name=None, service='panda', source_dir=None, distributed=True, init_env=None): + def __init__(self, name=None, service='panda', source_dir=None, type=WorkflowType.iWorkflow, distributed=True, max_walltime=24 * 3600, init_env=None): super(WorkflowContext, self).__init__() self._service = service # panda, idds, sharefs self._request_id = None - self._type = WorkflowType.iWorkflow + self._type = type # self.idds_host = None # self.idds_async_host = None @@ -77,12 +77,12 @@ def __init__(self, name=None, service='panda', source_dir=None, distributed=True self._priority = 500 self._core_count = 1 self._total_memory = 1000 # MB - self._max_walltime = 7 * 24 * 3600 + self._max_walltime = max_walltime self._max_attempt = 5 self._username = None self._userdn = None - self._type = WorkflowType.iWorkflow + self._type = type self._lifetime = 7 * 24 * 3600 self._workload_id = None self._request_id = None @@ -96,7 +96,7 @@ def __init__(self, name=None, service='panda', source_dir=None, distributed=True self._broker_password = None self._broker_destination = None - self.init_brokers() + # self.init_brokers() self._token = str(uuid.uuid4()) @@ -328,6 +328,59 @@ def init_brokers(self): self._broker_destination = broker_destination self._broker_initialized = True + else: + broker_info = self.get_broker_info() + if broker_info: + brokers = broker_info.get("brokers", None) + broker_destination = broker_info.get("broker_destination", None) + broker_timeout = broker_info.get("broker_timeout", 180) + broker_username = broker_info.get("broker_username", None) + broker_password = broker_info.get("broker_password", None) + if brokers and broker_destination and broker_username and broker_password: + self._brokers = brokers + self._broker_timeout = int(broker_timeout) + self._broker_username = broker_username + self._broker_password = broker_password + self._broker_destination = broker_destination + + self._broker_initialized = True + + def get_broker_info_from_idds_server(self): + """ + Get broker infomation from the iDDS server. + + :raise Exception when failing to get broker information. + """ + # iDDS ClientManager + from idds.client.clientmanager import ClientManager + + client = ClientManager(host=self.get_idds_server()) + ret = client.get_metainfo(name='asyncresult_config') + + return ret + + def get_broker_info_from_panda_server(self): + """ + Get broker infomation from the iDDS server through PanDA service. + + :raise Exception when failing to get broker information. + """ + import idds.common.utils as idds_utils + import pandaclient.idds_api as idds_api + + idds_server = self.get_idds_server() + client = idds_api.get_api(idds_utils.json_dumps, + idds_host=idds_server, + compress=True, + manager=True) + ret = client.get_metainfo(name='asyncresult_config') + + return ret + + def get_broker_info(self): + if self.service == 'panda': + return self.get_broker_info_from_panda_server() + return self.get_broker_info_from_idds_server() def init_idds(self): if not self._idds_initialized: @@ -366,10 +419,7 @@ def initialize(self): if 'PANDA_CONFIG_ROOT' not in os.environ: os.environ['PANDA_CONFIG_ROOT'] = os.getcwd() - def setup(self): - """ - :returns command: `str` to setup the workflow. - """ + def global_setup(self): if self.service == 'panda': set_up = self.setup_panda() elif self.service == 'idds': @@ -378,6 +428,13 @@ def setup(self): set_up = self.setup_sharefs() else: set_up = self.setup_sharefs() + return set_up + + def setup(self): + """ + :returns command: `str` to setup the workflow. + """ + set_up = self.global_setup() init_env = self.init_env ret = None @@ -600,8 +657,9 @@ def prepare(self): class Workflow(Base): - def __init__(self, func=None, service='panda', context=None, source_dir=None, distributed=True, - args=None, kwargs={}, group_kwargs=[], update_kwargs=None, init_env=None, is_unique_func_name=False): + def __init__(self, func=None, service='panda', context=None, source_dir=None, local=False, distributed=True, + args=None, kwargs={}, group_kwargs=[], update_kwargs=None, init_env=None, is_unique_func_name=False, + max_walltime=24 * 3600): """ Init a workflow. """ @@ -619,10 +677,16 @@ def __init__(self, func=None, service='panda', context=None, source_dir=None, di if self._name: self._name = self._name + "_" + datetime.datetime.utcnow().strftime("%Y_%m_%d_%H_%M_%S") source_dir = self.get_source_dir(self._func, source_dir) + + workflow_type = WorkflowType.iWorkflow + if local: + workflow_type = WorkflowType.iWorkflowLocal + if context is not None: self._context = context else: - self._context = WorkflowContext(name=self._name, service=service, source_dir=source_dir, distributed=distributed, init_env=init_env) + self._context = WorkflowContext(name=self._name, service=service, type=workflow_type, source_dir=source_dir, + distributed=distributed, init_env=init_env, max_walltime=max_walltime) @property def service(self): @@ -794,10 +858,10 @@ def token(self, value): self._context.token = value def get_work_tag(self): - return 'iWorkflow' + return self._context.type.name def get_work_type(self): - return WorkflowType.iWorkflow + return self._context.type def get_work_name(self): return self._name @@ -896,6 +960,54 @@ def submit(self): return None + def close_to_idds_server(self, request_id): + """ + close the workflow to the iDDS server. + + :param request_id: the workflow id. + :raise Exception when failing to close the workflow. + """ + # iDDS ClientManager + from idds.client.clientmanager import ClientManager + + client = ClientManager(host=self._context.get_idds_server()) + request_id = client.close(request_id) + + logging.info("Close request id=%s to iDDS server", str(request_id)) + return request_id + + def close_to_panda_server(self, request_id): + """ + close the workflow to the iDDS server through PanDA service. + + :param request_id: the workflow id. + :raise Exception when failing to closet the workflow. + """ + import idds.common.utils as idds_utils + import pandaclient.idds_api as idds_api + + idds_server = self._context.get_idds_server() + client = idds_api.get_api(idds_utils.json_dumps, + idds_host=idds_server, + compress=True, + manager=True) + request_id = client.close(request_id) + + logging.info("Close request id=%s through PanDA-iDDS", str(request_id)) + return request_id + + def close(self): + """ + close the workflow to the iDDS server. + + :raise Exception when failing to close the workflow. + """ + if self._context.request_id is not None: + if self.service == 'panda': + self.close_to_panda_server(self._context.request_id) + else: + self.close_to_idds_server(self._context.request_id) + def setup(self): """ :returns command: `str` to setup the workflow. @@ -969,7 +1081,9 @@ def __enter__(self): return self def __exit__(self, _type, _value, _tb): - WorkflowCanvas.pop_managed_workflow() + w = WorkflowCanvas.pop_managed_workflow() + if w is not None: + w.close() # /Context Manager ---------------------------------------------- @@ -999,9 +1113,10 @@ def get_func_name(self): # foo = workflow(arg)(foo) -def workflow(func=None, *, lazy=False, service='panda', source_dir=None, primary=False, distributed=True): +def workflow(func=None, *, local=False, service='idds', source_dir=None, primary=False, queue=None, site=None, cloud=None, max_walltime=24 * 3600, distributed=True): if func is None: - return functools.partial(workflow, lazy=lazy) + return functools.partial(workflow, local=local, service=service, source_dir=source_dir, primary=primary, queue=queue, site=site, cloud=cloud, + max_walltime=max_walltime, distributed=distributed) if 'IDDS_IWORKFLOW_LOAD_WORKFLOW' in os.environ: return func @@ -1009,12 +1124,26 @@ def workflow(func=None, *, lazy=False, service='panda', source_dir=None, primary @functools.wraps(func) def wrapper(*args, **kwargs): try: - f = Workflow(func, service=service, source_dir=source_dir, distributed=distributed) + f = Workflow(func, service=service, source_dir=source_dir, local=local, max_walltime=max_walltime, distributed=distributed) + f.queue = queue + f.site = site + f.cloud = cloud - if lazy: - return f + logging.info("Prepare workflow") + f.prepare() + logging.info("Prepared workflow") + + logging.info("Registering workflow") + f.submit() - return f.run() + if not local: + logging.info("Run workflow at remote sites") + return f + else: + logging.info("Run workflow locally") + with f: + ret = f.run() + return ret except Exception as ex: logging.error("Failed to run workflow %s: %s" % (func, ex)) raise ex From edf43737c83ad5aa596e418731f9e7a0ac005498 Mon Sep 17 00:00:00 2001 From: Wen Guan Date: Thu, 28 Mar 2024 16:33:35 +0100 Subject: [PATCH 6/8] mask password or token information --- common/lib/idds/common/utils.py | 43 ++++++++++++++++++++++++++++++++- 1 file changed, 42 insertions(+), 1 deletion(-) diff --git a/common/lib/idds/common/utils.py b/common/lib/idds/common/utils.py index e6d8d9f7..5dbe9520 100644 --- a/common/lib/idds/common/utils.py +++ b/common/lib/idds/common/utils.py @@ -34,7 +34,7 @@ from typing import Any, Callable from idds.common.config import (config_has_section, config_has_option, - config_get, config_get_bool) + config_get, config_get_bool, config_get_int) from idds.common.constants import (IDDSEnum, RequestType, RequestStatus, TransformType, TransformStatus, CollectionType, CollectionRelationType, CollectionStatus, @@ -148,6 +148,37 @@ def get_rest_cacher_dir(): raise Exception("cacher_dir is not defined or it doesn't exist") +def get_asyncresult_config(): + broker_type = None + brokers = None + broker_destination = None + broker_timeout = 360 + broker_username = None + broker_password = None + broker_x509 = None + + if config_has_section('asyncresult'): + if config_has_option('asyncresult', 'broker_type'): + broker_type = config_get('asyncresult', 'broker_type') + if config_has_option('asyncresult', 'brokers'): + brokers = config_get('asyncresult', 'brokers') + if config_has_option('asyncresult', 'broker_destination'): + broker_destination = config_get('asyncresult', 'broker_destination') + if config_has_option('asyncresult', 'broker_timeout'): + broker_timeout = config_get_int('asyncresult', 'broker_timeout') + if config_has_option('asyncresult', 'broker_username'): + broker_username = config_get('asyncresult', 'broker_username') + if config_has_option('asyncresult', 'broker_password'): + broker_password = config_get('asyncresult', 'broker_password') + if config_has_option('asyncresult', 'broker_x509'): + broker_x509 = config_get('asyncresult', 'broker_x509') + + ret = {'broker_type': broker_type, 'brokers': brokers, 'broker_destination': broker_destination, + 'broker_timeout': broker_timeout, 'broker_username': broker_username, 'broker_password': broker_password, + 'broker_x509': broker_x509} + return ret + + def str_to_date(string): """ Converts a string to the corresponding datetime value. @@ -901,3 +932,13 @@ def get_unique_id_for_dict(dict_): ret = hashlib.sha1(json.dumps(dict_, sort_keys=True).encode()).hexdigest() # logging.debug("get_unique_id_for_dict, type: %s: %s, ret: %s" % (type(dict_), dict_, ret)) return ret + + +def idds_mask(dict_): + ret = {} + for k in dict_: + if 'pass' in k or 'password' in k or 'passwd' in k or 'token' in k: + ret[k] = "***" + else: + ret[k] = dict_[k] + return ret From 5b02498c750e41e8d1aa88faea0ec80c7a962500 Mon Sep 17 00:00:00 2001 From: Wen Guan Date: Thu, 28 Mar 2024 16:34:35 +0100 Subject: [PATCH 7/8] mask password or token information --- common/lib/idds/common/utils.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/common/lib/idds/common/utils.py b/common/lib/idds/common/utils.py index 5dbe9520..1a73cde2 100644 --- a/common/lib/idds/common/utils.py +++ b/common/lib/idds/common/utils.py @@ -937,7 +937,7 @@ def get_unique_id_for_dict(dict_): def idds_mask(dict_): ret = {} for k in dict_: - if 'pass' in k or 'password' in k or 'passwd' in k or 'token' in k: + if 'pass' in k or 'password' in k or 'passwd' in k or 'token' in k or 'security' in k or 'secure' in k: ret[k] = "***" else: ret[k] = dict_[k] From 8192e66ba0c6a7dcbc5a533d3a9c8b5796ad2c01 Mon Sep 17 00:00:00 2001 From: Wen Guan Date: Thu, 28 Mar 2024 16:35:42 +0100 Subject: [PATCH 8/8] upgrade database workflow tag --- main/etc/sql/oracle_update.sql | 3 +++ main/lib/idds/tests/panda_test.py | 3 ++- .../idds/tests/test_iworkflow/test_iworkflow.py | 4 ++-- main/tools/env/setup_panda.sh | 14 +++++++------- monitor/data/conf.js | 12 ++++++------ workflow/tools/make/make.sh | 2 +- 6 files changed, 21 insertions(+), 17 deletions(-) diff --git a/main/etc/sql/oracle_update.sql b/main/etc/sql/oracle_update.sql index 4e32ed30..f738de11 100644 --- a/main/etc/sql/oracle_update.sql +++ b/main/etc/sql/oracle_update.sql @@ -473,3 +473,6 @@ alter table TRANSFORMS add (parent_transform_id NUMBER(12)); alter table TRANSFORMS add (previous_transform_id NUMBER(12)); alter table TRANSFORMS add (current_processing_id NUMBER(12)); alter table PROCESSINGS add (processing_type NUMBER(2)); + +--- 20240327 +alter table requests modify (transform_tag VARCHAR2(20)); diff --git a/main/lib/idds/tests/panda_test.py b/main/lib/idds/tests/panda_test.py index 9f0a52e3..1e51409c 100644 --- a/main/lib/idds/tests/panda_test.py +++ b/main/lib/idds/tests/panda_test.py @@ -53,7 +53,8 @@ task_ids = [i for i in range(8752, 8958)] task_ids = [168645, 168638] task_ids = [168747, 168761, 168763] -task_ids = [168770] +task_ids = [13413] +task_ids = [168859, 168861, 168862] for task_id in task_ids: print("Killing %s" % task_id) ret = Client.killTask(task_id, verbose=True) diff --git a/main/lib/idds/tests/test_iworkflow/test_iworkflow.py b/main/lib/idds/tests/test_iworkflow/test_iworkflow.py index 4482d873..258c908c 100644 --- a/main/lib/idds/tests/test_iworkflow/test_iworkflow.py +++ b/main/lib/idds/tests/test_iworkflow/test_iworkflow.py @@ -119,8 +119,8 @@ def test_create_archive_file(wf): # wf = Workflow(func=test_workflow, service='idds', distributed=False) wf = Workflow(func=test_workflow, service='idds') - # wf.queue = 'BNL_OSG_2' - wf.queue = 'FUNCX_TEST' + wf.queue = 'BNL_OSG_2' + # wf.queue = 'FUNCX_TEST' wf.cloud = 'US' wf_json = json_dumps(wf) diff --git a/main/tools/env/setup_panda.sh b/main/tools/env/setup_panda.sh index 2f8c9a9d..6c18c1e0 100644 --- a/main/tools/env/setup_panda.sh +++ b/main/tools/env/setup_panda.sh @@ -73,16 +73,16 @@ else # export IDDS_HOST=https://aipanda015.cern.ch:443/idds # dev - # export IDDS_HOST=https://aipanda104.cern.ch:443/idds + export IDDS_HOST=https://aipanda104.cern.ch:443/idds # doma - export IDDS_HOST=https://aipanda105.cern.ch:443/idds + # export IDDS_HOST=https://aipanda105.cern.ch:443/idds - export IDDS_BROKERS=atlas-test-mb.cern.ch:61013 - export IDDS_BROKER_DESTINATION=/topic/doma.idds - export IDDS_BROKER_USERNAME=domaidds - export IDDS_BROKER_PASSWORD=1d25yeft6krJ1HFH - export IDDS_BROKER_TIMEOUT=360 + # export IDDS_BROKERS=atlas-test-mb.cern.ch:61013 + # export IDDS_BROKER_DESTINATION=/topic/doma.idds + # export IDDS_BROKER_USERNAME=domaidds + # export IDDS_BROKER_PASSWORD=1d25yeft6krJ1HFH + # export IDDS_BROKER_TIMEOUT=360 PANDA_QUEUE=BNL_OSG_2 PANDA_WORKING_GROUP=EIC diff --git a/monitor/data/conf.js b/monitor/data/conf.js index bc520561..18a92f16 100644 --- a/monitor/data/conf.js +++ b/monitor/data/conf.js @@ -1,9 +1,9 @@ var appConfig = { - 'iddsAPI_request': "https://lxplus927.cern.ch:443/idds/monitor_request/null/null", - 'iddsAPI_transform': "https://lxplus927.cern.ch:443/idds/monitor_transform/null/null", - 'iddsAPI_processing': "https://lxplus927.cern.ch:443/idds/monitor_processing/null/null", - 'iddsAPI_request_detail': "https://lxplus927.cern.ch:443/idds/monitor/null/null/true/false/false", - 'iddsAPI_transform_detail': "https://lxplus927.cern.ch:443/idds/monitor/null/null/false/true/false", - 'iddsAPI_processing_detail': "https://lxplus927.cern.ch:443/idds/monitor/null/null/false/false/true" + 'iddsAPI_request': "https://lxplus939.cern.ch:443/idds/monitor_request/null/null", + 'iddsAPI_transform': "https://lxplus939.cern.ch:443/idds/monitor_transform/null/null", + 'iddsAPI_processing': "https://lxplus939.cern.ch:443/idds/monitor_processing/null/null", + 'iddsAPI_request_detail': "https://lxplus939.cern.ch:443/idds/monitor/null/null/true/false/false", + 'iddsAPI_transform_detail': "https://lxplus939.cern.ch:443/idds/monitor/null/null/false/true/false", + 'iddsAPI_processing_detail': "https://lxplus939.cern.ch:443/idds/monitor/null/null/false/false/true" } diff --git a/workflow/tools/make/make.sh b/workflow/tools/make/make.sh index 5948a439..60ce6ec4 100644 --- a/workflow/tools/make/make.sh +++ b/workflow/tools/make/make.sh @@ -56,7 +56,7 @@ cd $workdir mkdir lib_py # for libname in idds pandaclient pandatools tabulate pyjwt requests urllib3 argcomplete cryptography packaging anytree networkx; do # for libname in idds pandaclient pandatools tabulate jwt requests urllib3 argcomplete cryptography packaging stomp cffi charset_normalizer docopt.py idna pycparser six.py websocket _cffi_backend*; do -for libname in idds pandaclient pandatools tabulate requests urllib3 argcomplete stomp websocket charset_normalizer idna certifi; do +for libname in idds pandaclient pandatools tabulate requests urllib3 argcomplete stomp websocket charset_normalizer idna certifi packaging; do echo cp -fr ${python_lib_path}/$libname lib_py cp -fr ${python_lib_path}/$libname lib_py done