From 19c9c527677655d6ff989ff334c893abf4901bde Mon Sep 17 00:00:00 2001 From: Wen Guan Date: Tue, 6 Aug 2024 10:27:43 +0200 Subject: [PATCH] core and orm conditions --- main/lib/idds/core/conditions.py | 104 +++++++++++++++++++ main/lib/idds/orm/conditions.py | 167 +++++++++++++++++++++++++++++++ 2 files changed, 271 insertions(+) create mode 100644 main/lib/idds/core/conditions.py create mode 100644 main/lib/idds/orm/conditions.py diff --git a/main/lib/idds/core/conditions.py b/main/lib/idds/core/conditions.py new file mode 100644 index 00000000..6df389b5 --- /dev/null +++ b/main/lib/idds/core/conditions.py @@ -0,0 +1,104 @@ +#!/usr/bin/env python +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# You may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0OA +# +# Authors: +# - Wen Guan, , 2024 + + +""" +operations related to Conditions. +""" + + +from idds.common.constants import ConditionStatus +from idds.orm.base.session import read_session, transactional_session +from idds.orm import conditions as orm_conditions + + +@transactional_session +def add_condition(request_id, internal_id, status=ConditionStatus.WaitForTrigger, + substatus=None, is_loop=False, loop_index=None, cloned_from=None, + evaluate_result=None, previous_transforms=None, following_transforms=None, + condition=None, session=None): + """ + Add a condition. + + :param request_id: The request id. + :param intenal_id: The internal id. + :param status: The status about the condition. + :param substatus: The substatus about the condition. + :param is_loop: Whether it's a loop condition. + :param loop_index: The loop index if it's a loop. + :param cloned_from: The original condition if it's a loop. + :param evaluate_result: The condition's evaluated result. + :param previous_transforms: The previous transforms which can trigger this condition. + :param following_transorms: The following transforms which will be triggered. + :param condition: The condition function. + :param session: The database session. + """ + + cond_id = orm_conditions.add_condition(request_id=request_id, internal_id=internal_id, + status=status, substatus=substatus, is_loop=is_loop, + loop_index=loop_index, cloned_from=cloned_from, + evaluate_result=evaluate_result, + previous_transforms=previous_transforms, + following_transforms=following_transforms, + condition=condition, + session=session) + return cond_id + + +@transactional_session +def update_condition(condition_id, parameters, session=None): + """ + Update condition. + + :param condition_id: The condition id. + :param parameters: Parameters as a dict. + :param session: The database session. + """ + orm_conditions.update_condition(condition_id=condition_id, parameters=parameters, session=session) + + +@transactional_session +def update_conditions(conditions, session=None): + """ + Update conditions. + + :param conditions: Condtions as a list of dict. + :param session: The database session. + """ + orm_conditions.update_conditions(conditions=conditions, session=session) + + +@read_session +def retrieve_conditions(request_id, internal_id=None, status=None, session=None): + """ + Retrieve conditions + + :param request_id: The request id. + :param intenal_id: The internal id. + :param status: The status about the condition. + :param session: The database session. + + :returns command: List of conditions + """ + conds = orm_conditions.retrieve_conditions(request_id=request_id, internal_id=internal_id, + status=status, session=session) + return conds + + +@transactional_session +def delete_conditions(request_id=None, internal_id=None, session=None): + """ + Delete all conditions with the given IDs. + + :param request_id: The request id. + :param intenal_id: The internal id. + :param session: The database session. + """ + orm_conditions.delete_condtions(request_id=request_id, internal_id=internal_id, session=session) diff --git a/main/lib/idds/orm/conditions.py b/main/lib/idds/orm/conditions.py new file mode 100644 index 00000000..e471e9fb --- /dev/null +++ b/main/lib/idds/orm/conditions.py @@ -0,0 +1,167 @@ +#!/usr/bin/env python +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# You may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0OA +# +# Authors: +# - Wen Guan, , 2024 + + +""" +operations related to Conditions. +""" + +import re +import datetime + +from sqlalchemy.exc import DatabaseError, IntegrityError + +from idds.common import exceptions +from idds.common.constants import ConditionStatus +from idds.orm.base import models +from idds.orm.base.session import read_session, transactional_session + + +@transactional_session +def add_condition(request_id, internal_id, status=ConditionStatus.WaitForTrigger, + substatus=None, is_loop=False, loop_index=None, cloned_from=None, + evaluate_result=None, previous_transforms=None, following_transforms=None, + condition=None, session=None): + """ + Add a condition. + + :param request_id: The request id. + :param intenal_id: The internal id. + :param status: The status about the condition. + :param substatus: The substatus about the condition. + :param is_loop: Whether it's a loop condition. + :param loop_index: The loop index if it's a loop. + :param cloned_from: The original condition if it's a loop. + :param evaluate_result: The condition's evaluated result. + :param previous_transforms: The previous transforms which can trigger this condition. + :param following_transorms: The following transforms which will be triggered. + :param condition: The condition function. + :param session: The database session. + """ + + try: + cond = models.Condition(request_id=request_id, internal_id=internal_id, + status=status, substatus=substatus, is_loop=is_loop, + loop_index=loop_index, cloned_from=cloned_from, + evaluate_result=evaluate_result, + previous_transforms=previous_transforms, + following_transforms=following_transforms, + condition=condition) + + cond.save(session=session) + cond_id = cond.condition_id + return cond_id + except TypeError as e: + raise exceptions.DatabaseException('Invalid JSON for condition: %s' % str(e)) + except DatabaseError as e: + if re.match('.*ORA-12899.*', e.args[0]) \ + or re.match('.*1406.*', e.args[0]): + raise exceptions.DatabaseException('Could not persist condition, condition too large: %s' % str(e)) + else: + raise exceptions.DatabaseException('Could not persist condition: %s' % str(e)) + + +@transactional_session +def update_condition(condition_id, parameters, session=None): + """ + Update condition. + + :param condition_id: The condition id. + :param parameters: Parameters as a dict. + :param session: The database session. + """ + + try: + parameters['updated_at'] = datetime.datetime.utcnow() + session.query(models.Condition).filter_by(condition_id=condition_id)\ + .update(parameters, synchronize_session=False) + except TypeError as e: + raise exceptions.DatabaseException('Invalid JSON for condition: %s' % str(e)) + except DatabaseError as e: + if re.match('.*ORA-12899.*', e.args[0]) \ + or re.match('.*1406.*', e.args[0]): + raise exceptions.DatabaseException('Could not persist condition, condition too large: %s' % str(e)) + else: + raise exceptions.DatabaseException('Could not persist condition: %s' % str(e)) + + +@transactional_session +def update_conditions(conditions, session=None): + """ + Update conditions. + + :param conditions: Condtions as a list of dict. + :param session: The database session. + """ + + try: + session.bulk_update_mappings(models.Conidition, conditions) + except TypeError as e: + raise exceptions.DatabaseException('Invalid JSON for condition: %s' % str(e)) + except DatabaseError as e: + if re.match('.*ORA-12899.*', e.args[0]) \ + or re.match('.*1406.*', e.args[0]): + raise exceptions.DatabaseException('Could not persist condition, condition too large: %s' % str(e)) + else: + raise exceptions.DatabaseException('Could not persist condition: %s' % str(e)) + + +@read_session +def retrieve_conditions(request_id, internal_id=None, status=None, session=None): + """ + Retrieve conditions + + :param request_id: The request id. + :param intenal_id: The internal id. + :param status: The status about the condition. + :param session: The database session. + + :returns command: List of conditions + """ + conditions = [] + try: + query = session.query(models.Condition) + + if request_id is not None: + query = query.filter_by(request_id=request_id) + if internal_id is not None: + query = query.filter_by(internal_id=internal_id) + if status is not None: + query = query.filter_by(status=status) + + tmp = query.all() + if tmp: + for t in tmp: + conditions.append(t.to_dict()) + return conditions + except IntegrityError as e: + raise exceptions.DatabaseException(e.args) + + +@transactional_session +def delete_conditions(request_id=None, internal_id=None, session=None): + """ + Delete all conditions with the given IDs. + + :param request_id: The request id. + :param intenal_id: The internal id. + :param session: The database session. + """ + try: + query = session.query(models.Condition) + + if request_id is not None: + query = query.filter_by(request_id=request_id) + if internal_id is not None: + query = query.filter_by(internal_id=internal_id) + + query.delete(synchronize_session=False) + except IntegrityError as e: + raise exceptions.DatabaseException(e.args)