Skip to content

Commit

Permalink
Merge pull request #288 from HSF/dev
Browse files Browse the repository at this point in the history
Dev
  • Loading branch information
wguanicedew authored Mar 28, 2024
2 parents bdcc815 + 763ac3b commit f28c8f2
Show file tree
Hide file tree
Showing 21 changed files with 642 additions and 63 deletions.
3 changes: 2 additions & 1 deletion client/lib/idds/client/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,13 @@
from idds.client.messageclient import MessageClient
from idds.client.pingclient import PingClient
from idds.client.authclient import AuthClient
from idds.client.metainfoclient import MetaInfoClient


warnings.filterwarnings("ignore")


class Client(RequestClient, TransformClient, CatalogClient, CacherClient, HPOClient, LogsClient, MessageClient, PingClient, AuthClient):
class Client(RequestClient, TransformClient, CatalogClient, CacherClient, HPOClient, LogsClient, MessageClient, PingClient, AuthClient, MetaInfoClient):

"""Main client class for IDDS rest callings."""

Expand Down
43 changes: 41 additions & 2 deletions client/lib/idds/client/clientmanager.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
# http://www.apache.org/licenses/LICENSE-2.0OA
#
# Authors:
# - Wen Guan, <[email protected]>, 2020 - 2023
# - Wen Guan, <[email protected]>, 2020 - 2024


"""
Expand All @@ -32,7 +32,7 @@


from idds.common.authentication import OIDCAuthentication, OIDCAuthenticationUtils
from idds.common.utils import setup_logging, get_proxy_path
from idds.common.utils import setup_logging, get_proxy_path, idds_mask

from idds.client.version import release_version
from idds.client.client import Client
Expand Down Expand Up @@ -444,6 +444,13 @@ def submit(self, workflow, username=None, userdn=None, use_dataset_name=False):
priority = workflow.priority
if priority is None:
priority = 0
elif workflow.type in [WorkflowType.iWorkflowLocal]:
scope = 'iworkflowLocal'
request_type = RequestType.iWorkflowLocal
transform_tag = workflow.get_work_tag()
priority = workflow.priority
if priority is None:
priority = 0
except Exception:
pass

Expand Down Expand Up @@ -621,6 +628,24 @@ def abort_task(self, request_id=None, workload_id=None, task_id=None):
ret = self.client.abort_request_task(request_id=request_id, workload_id=workload_id, task_id=task_id)
return ret

@exception_handler
def close(self, request_id=None, workload_id=None):
"""
Close requests.
:param workload_id: the workload id.
:param request_id: the request.
"""
self.setup_client()

if request_id is None and workload_id is None:
logging.error("Both request_id and workload_id are None. One of them should not be None")
return (-1, "Both request_id and workload_id are None. One of them should not be None")

ret = self.client.close_request(request_id=request_id, workload_id=workload_id)
# return (-1, 'No matching requests')
return ret

@exception_handler
def retry(self, request_id=None, workload_id=None):
"""
Expand Down Expand Up @@ -840,3 +865,17 @@ def update_build_request(self, request_id, signature, workflow):
self.setup_client()

return self.client.update_build_request(request_id=request_id, signature=signature, workflow=workflow)

@exception_handler
def get_metainfo(self, name):
"""
Get meta info.
:param name: the name of the meta info.
"""
self.setup_client()

logging.info("Retrieving meta info for %s" % (name))
ret = self.client.get_metainfo(name=name)
logging.info("Retrieved meta info for %s: %s" % (name, idds_mask(ret)))
return ret
49 changes: 49 additions & 0 deletions client/lib/idds/client/metainfoclient.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
#!/usr/bin/env python
#
# Licensed under the Apache License, Version 2.0 (the "License");
# You may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0OA
#
# Authors:
# - Wen Guan, <[email protected]>, 2024


"""
Metainfo Rest client to access IDDS system.
"""

import os

from idds.client.base import BaseRestClient


class MetaInfoClient(BaseRestClient):

"""MetaInfo Rest client"""

METAINFO_BASEURL = 'metainfo'

def __init__(self, host=None, auth=None, timeout=None):
"""
Constructor of the BaseRestClient.
:param host: the address of the IDDS server.
:param client_proxy: the client certificate proxy.
:param timeout: timeout in seconds.
"""
super(MetaInfoClient, self).__init__(host=host, auth=auth, timeout=timeout)

def get_metainfo(self, name):
"""
Get meta info
:param name: name to select different meta info
:raise exceptions if it's not got successfully.
"""
path = self.METAINFO_BASEURL
url = self.build_url(self.host, path=os.path.join(path, str(name)))

meta_info = self.get_request_response(url, type='GET')
return meta_info
21 changes: 21 additions & 0 deletions client/lib/idds/client/requestclient.py
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,27 @@ def abort_request(self, request_id, workload_id=None):
r = self.get_request_response(url, type='PUT', data=None)
return r

def close_request(self, request_id, workload_id=None):
"""
Close Request.
:param request_id: the request.
:param kwargs: other attributes of the request.
:raise exceptions if it's not updated successfully.
"""
path = self.REQUEST_BASEURL
path += "/close"

if request_id is None:
request_id = 'null'
if workload_id is None:
workload_id = 'null'

url = self.build_url(self.host, path=os.path.join(path, str(request_id), str(workload_id)))
r = self.get_request_response(url, type='PUT', data=None)
return r

def abort_request_task(self, request_id, workload_id=None, task_id=None):
"""
Abort Request task.
Expand Down
4 changes: 4 additions & 0 deletions common/lib/idds/common/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ class WorkflowType(IDDSEnum):
Workflow = 0
iWorkflow = 1
iWork = 2
iWorkflowLocal = 3


class WorkStatus(IDDSEnum):
Expand Down Expand Up @@ -153,6 +154,7 @@ class RequestStatus(IDDSEnum):
Building = 21
Built = 22
Throttling = 23
ToClose = 24


class RequestLocking(IDDSEnum):
Expand Down Expand Up @@ -196,6 +198,7 @@ class RequestType(IDDSEnum):
HyperParameterOpt = 4
Derivation = 5
iWorkflow = 6
iWorkflowLocal = 7
Other = 99


Expand Down Expand Up @@ -489,6 +492,7 @@ class CommandType(IDDSEnum):
AbortRequest = 0
ResumeRequest = 1
ExpireRequest = 2
CloseRequest = 3


class CommandStatus(IDDSEnum):
Expand Down
15 changes: 15 additions & 0 deletions common/lib/idds/common/event.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ class EventType(IDDSEnum):
AbortRequest = 12
ResumeRequest = 13
ExpireRequest = 14
CloseRequest = 15

NewTransform = 20
UpdateTransform = 21
Expand Down Expand Up @@ -242,6 +243,20 @@ def to_json(self, strip=False):
return ret


class CloseRequestEvent(Event):
def __init__(self, publisher_id=None, request_id=None, content=None, counter=1):
super(CloseRequestEvent, self).__init__(publisher_id, event_type=EventType.CloseRequest, content=content, counter=counter)
self._request_id = request_id

def get_event_id(self):
return self._request_id

def to_json(self, strip=False):
ret = super(CloseRequestEvent, self).to_json()
ret['request_id'] = self._request_id
return ret


class ResumeRequestEvent(Event):
def __init__(self, publisher_id=None, request_id=None, content=None, counter=1):
super(ResumeRequestEvent, self).__init__(publisher_id, event_type=EventType.ResumeRequest, content=content, counter=counter)
Expand Down
43 changes: 42 additions & 1 deletion common/lib/idds/common/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
from typing import Any, Callable

from idds.common.config import (config_has_section, config_has_option,
config_get, config_get_bool)
config_get, config_get_bool, config_get_int)
from idds.common.constants import (IDDSEnum, RequestType, RequestStatus,
TransformType, TransformStatus,
CollectionType, CollectionRelationType, CollectionStatus,
Expand Down Expand Up @@ -148,6 +148,37 @@ def get_rest_cacher_dir():
raise Exception("cacher_dir is not defined or it doesn't exist")


def get_asyncresult_config():
broker_type = None
brokers = None
broker_destination = None
broker_timeout = 360
broker_username = None
broker_password = None
broker_x509 = None

if config_has_section('asyncresult'):
if config_has_option('asyncresult', 'broker_type'):
broker_type = config_get('asyncresult', 'broker_type')
if config_has_option('asyncresult', 'brokers'):
brokers = config_get('asyncresult', 'brokers')
if config_has_option('asyncresult', 'broker_destination'):
broker_destination = config_get('asyncresult', 'broker_destination')
if config_has_option('asyncresult', 'broker_timeout'):
broker_timeout = config_get_int('asyncresult', 'broker_timeout')
if config_has_option('asyncresult', 'broker_username'):
broker_username = config_get('asyncresult', 'broker_username')
if config_has_option('asyncresult', 'broker_password'):
broker_password = config_get('asyncresult', 'broker_password')
if config_has_option('asyncresult', 'broker_x509'):
broker_x509 = config_get('asyncresult', 'broker_x509')

ret = {'broker_type': broker_type, 'brokers': brokers, 'broker_destination': broker_destination,
'broker_timeout': broker_timeout, 'broker_username': broker_username, 'broker_password': broker_password,
'broker_x509': broker_x509}
return ret


def str_to_date(string):
"""
Converts a string to the corresponding datetime value.
Expand Down Expand Up @@ -901,3 +932,13 @@ def get_unique_id_for_dict(dict_):
ret = hashlib.sha1(json.dumps(dict_, sort_keys=True).encode()).hexdigest()
# logging.debug("get_unique_id_for_dict, type: %s: %s, ret: %s" % (type(dict_), dict_, ret))
return ret


def idds_mask(dict_):
ret = {}
for k in dict_:
if 'pass' in k or 'password' in k or 'passwd' in k or 'token' in k or 'security' in k or 'secure' in k:
ret[k] = "***"
else:
ret[k] = dict_[k]
return ret
3 changes: 3 additions & 0 deletions main/etc/sql/oracle_update.sql
Original file line number Diff line number Diff line change
Expand Up @@ -473,3 +473,6 @@ alter table TRANSFORMS add (parent_transform_id NUMBER(12));
alter table TRANSFORMS add (previous_transform_id NUMBER(12));
alter table TRANSFORMS add (current_processing_id NUMBER(12));
alter table PROCESSINGS add (processing_type NUMBER(2));

--- 20240327
alter table requests modify (transform_tag VARCHAR2(20));
Loading

0 comments on commit f28c8f2

Please sign in to comment.