From bf88d017747ef8a07fa602701f5822814dd73e9f Mon Sep 17 00:00:00 2001 From: Henrik Ek Date: Wed, 11 Dec 2024 21:55:24 +0100 Subject: [PATCH] Make it possible to revoke pending tasks (#2343) --- ESSArch_Core/WorkflowEngine/models.py | 26 ++++++++----------- ESSArch_Core/WorkflowEngine/serializers.py | 2 +- ESSArch_Core/WorkflowEngine/signals.py | 22 ++++++++++++++++ ESSArch_Core/WorkflowEngine/views.py | 4 +-- .../views/modals/step_info_modal.html | 4 +-- .../views/modals/task_info_modal.html | 8 +++--- .../static/frontend/views/workflow/redo.html | 2 +- ESSArch_Core/ip/models.py | 6 ++--- ESSArch_Core/ip/views.py | 4 +++ ESSArch_Core/storage/models.py | 10 ++++--- ESSArch_Core/util.py | 2 +- ESSArch_Core/workflow/tasks.py | 4 ++- requirements/base.txt | 6 ++--- requirements/tests.txt | 2 +- 14 files changed, 64 insertions(+), 38 deletions(-) diff --git a/ESSArch_Core/WorkflowEngine/models.py b/ESSArch_Core/WorkflowEngine/models.py index 14fa7c9c1..81763ecc6 100644 --- a/ESSArch_Core/WorkflowEngine/models.py +++ b/ESSArch_Core/WorkflowEngine/models.py @@ -295,6 +295,8 @@ def retry(self, direct=True): none """ + logger = logging.getLogger('essarch.WorkflowEngine') + logger.info('Retrying step {} ({})'.format(self.name, self.pk)) child_steps = self.child_steps.all() tasks = self.tasks(manager='by_step_pos').filter( @@ -338,19 +340,12 @@ def resume(self, direct=True): """ logger = logging.getLogger('essarch.WorkflowEngine') - logger.debug('Resuming step {} ({})'.format(self.name, self.pk)) - ProcessTask.objects.filter( + logger.info('Resuming step {} ({})'.format(self.name, self.pk)) + for t in ProcessTask.objects.filter( processstep__in=self.get_descendants(include_self=True), status__in=[celery_states.PENDING, celery_states.FAILURE, celery_states.REVOKED], - ).update( - status=celery_states.PENDING, - time_started=None, - time_done=None, - traceback='', - exception='', - progress=0, - result=None, - ) + ): + t.reset() child_steps = self.get_children() step_descendants = self.get_descendants(include_self=True) @@ -720,7 +715,10 @@ def get_remote_copy(self, session, host): return r def reset(self): + logger = logging.getLogger('essarch.WorkflowEngine') + logger.info('Reset task ({})'.format(self.pk)) self.status = celery_states.PENDING + self.celery_id = uuid.uuid4() self.time_started = None self.time_done = None self.traceback = '' @@ -778,19 +776,17 @@ def run(self): def revoke(self): logger = logging.getLogger('essarch.WorkflowEngine') - logger.debug('Revoking task ({})'.format(self.pk)) + logger.info('Revoke task ({})'.format(self.pk)) current_app.control.revoke(str(self.celery_id), terminate=True) self.status = celery_states.REVOKED - self.celery_id = uuid.uuid4() self.save() - logger.info('Revoked task ({})'.format(self.pk)) def retry(self): """ Retries the task """ logger = logging.getLogger('essarch.WorkflowEngine') - logger.debug('Retrying task ({})'.format(self.pk)) + logger.info('Retrying task ({})'.format(self.pk)) self.reset() return self.run() diff --git a/ESSArch_Core/WorkflowEngine/serializers.py b/ESSArch_Core/WorkflowEngine/serializers.py index 604cf483f..552100d88 100644 --- a/ESSArch_Core/WorkflowEngine/serializers.py +++ b/ESSArch_Core/WorkflowEngine/serializers.py @@ -125,7 +125,7 @@ class Meta: 'information_package_str', 'eager', ) read_only_fields = ( - 'id', 'progress', 'time_created', 'time_started', 'time_done', 'retried', + 'id', 'time_created', 'time_started', 'time_done', 'retried', ) extra_kwargs = { 'id': { diff --git a/ESSArch_Core/WorkflowEngine/signals.py b/ESSArch_Core/WorkflowEngine/signals.py index 5faea1c68..5ecf0ad9d 100644 --- a/ESSArch_Core/WorkflowEngine/signals.py +++ b/ESSArch_Core/WorkflowEngine/signals.py @@ -1,3 +1,6 @@ +import logging + +from celery.signals import task_received, task_revoked from django.db.models.signals import post_save, pre_save from django.dispatch import receiver @@ -24,3 +27,22 @@ def step_post_save(sender, instance, created, **kwargs): instance.parent.clear_cache() except AttributeError: pass + + +@task_received.connect +def task_received_handler(request=None, **kwargs): + logger = logging.getLogger('essarch') + try: + t = ProcessTask.objects.get(celery_id=request.task_id) + logger.debug('{} signal task_received status is {}'.format(request.task_id, repr(t.status))) + if t.status == 'REVOKED': + t.revoke() + except ProcessTask.DoesNotExist: + logger.debug('{} signal task_received without ProcessTask'.format(request.task_id)) + pass + + +@task_revoked.connect +def task_revoked_handler(request=None, **kwargs): + logger = logging.getLogger('essarch') + logger.debug('{} signal task_revoked'.format(request.id)) diff --git a/ESSArch_Core/WorkflowEngine/views.py b/ESSArch_Core/WorkflowEngine/views.py index 194d2a516..f1639be81 100644 --- a/ESSArch_Core/WorkflowEngine/views.py +++ b/ESSArch_Core/WorkflowEngine/views.py @@ -147,8 +147,8 @@ def run(self, request, pk=None): @action(detail=True, methods=['post'], permission_classes=[CanRevoke]) def revoke(self, request, pk=None): obj = self.get_object() - if obj.status != celery_states.STARTED: - raise exceptions.ParseError('Only running tasks can be revoked') + if obj.status not in [celery_states.STARTED, celery_states.PENDING]: + raise exceptions.ParseError('Only running or pending tasks can be revoked') obj.revoke() return Response({'status': 'revoked task'}) diff --git a/ESSArch_Core/frontend/static/frontend/views/modals/step_info_modal.html b/ESSArch_Core/frontend/static/frontend/views/modals/step_info_modal.html index 021f03093..ed7b913aa 100644 --- a/ESSArch_Core/frontend/static/frontend/views/modals/step_info_modal.html +++ b/ESSArch_Core/frontend/static/frontend/views/modals/step_info_modal.html @@ -80,7 +80,7 @@
{{ ("ERROR.ERROR" | translate) + ":" }} @@ -90,7 +90,7 @@
diff --git a/ESSArch_Core/frontend/static/frontend/views/workflow/redo.html b/ESSArch_Core/frontend/static/frontend/views/workflow/redo.html index 2070184af..7bea40758 100644 --- a/ESSArch_Core/frontend/static/frontend/views/workflow/redo.html +++ b/ESSArch_Core/frontend/static/frontend/views/workflow/redo.html @@ -1,6 +1,6 @@ {{ "REDO" | translate }} diff --git a/ESSArch_Core/ip/models.py b/ESSArch_Core/ip/models.py index 591f7ef97..801a50781 100644 --- a/ESSArch_Core/ip/models.py +++ b/ESSArch_Core/ip/models.py @@ -1426,7 +1426,7 @@ def create_preservation_workflow(self): return workflow def create_access_workflow(self, user, tar=False, extracted=False, new=False, object_identifier_value=None, - package_xml=False, aic_xml=False, diff_check=False, edit=False): + package_xml=False, aic_xml=False, diff_check=False, edit=False, responsible=None): logger = logging.getLogger('essarch.ip') if new: dst_object_identifier_value = object_identifier_value or str(uuid.uuid4()) @@ -1511,7 +1511,7 @@ def create_access_workflow(self, user, tar=False, extracted=False, new=False, ob } ) - return create_workflow(workflow, self, name='Access Information Package') + return create_workflow(workflow, self, name='Access Information Package', responsible=responsible) if tar: try: @@ -1981,7 +1981,7 @@ def create_access_workflow(self, user, tar=False, extracted=False, new=False, ob "queue": worker_queue, "args": [str(new_aip.pk), str(user.pk), Workarea.ACCESS, tar] }) - return create_workflow(workflow, self, name='Access Information Package') + return create_workflow(workflow, self, name='Access Information Package', responsible=responsible) def create_migration_workflow(self, temp_path, storage_methods, export_path='', tar=False, extracted=False, package_xml=False, aic_xml=False, diff_check=True, responsible=None): diff --git a/ESSArch_Core/ip/views.py b/ESSArch_Core/ip/views.py index 32f83bc04..8c5371aa9 100644 --- a/ESSArch_Core/ip/views.py +++ b/ESSArch_Core/ip/views.py @@ -1820,6 +1820,9 @@ def access(self, request, pk=None): raise exceptions.ParseError('IP must either have state "Received" or be archived to be accessed') data = request.data + user = None + if request and hasattr(request, "user"): + user = request.user options = ['tar', 'extracted', 'edit'] if ip.package_type == InformationPackage.AIP: @@ -1878,6 +1881,7 @@ def access(self, request, pk=None): aic_xml=data.get('aic_xml', False), diff_check=data.get('diff_check', False), edit=data.get('edit', False), + responsible=user, ) workflow.run() return Response({'detail': gettext('Accessing {ip}...').format(ip=ip), 'step': workflow.pk}) diff --git a/ESSArch_Core/storage/models.py b/ESSArch_Core/storage/models.py index 505af5771..e0e2ac8d0 100644 --- a/ESSArch_Core/storage/models.py +++ b/ESSArch_Core/storage/models.py @@ -744,8 +744,10 @@ def fastest(self): output_field=IntegerField(), ) remote = Case( - When(storage_medium__storage_target__remote_server__isnull=True, then=Value(1)), - When(storage_medium__storage_target__remote_server__isnull=False, then=Value(2)), + When((Q(storage_medium__storage_target__remote_server=None) | + Q(storage_medium__storage_target__remote_server='')), then=Value(1)), + When(~(Q(storage_medium__storage_target__remote_server=None) | + Q(storage_medium__storage_target__remote_server='')), then=Value(2)), output_field=IntegerField(), ) storage_type = Case( @@ -763,8 +765,8 @@ def fastest(self): remote=remote, storage_type=storage_type, content_location_value_int=content_location_value_int, - ).order_by('remote', 'container_order', 'storage_type').natural_sort('storage_medium__medium_id' - ).order_by('content_location_value_int') + ).order_by('remote', 'container_order', 'storage_type', + 'storage_medium__medium_id', 'content_location_value_int') class StorageObject(models.Model): diff --git a/ESSArch_Core/util.py b/ESSArch_Core/util.py index 26f35fe3d..7378b68ff 100644 --- a/ESSArch_Core/util.py +++ b/ESSArch_Core/util.py @@ -177,7 +177,7 @@ def get_value_from_path(root, path): try: el = get_elements_without_namespace(root, path)[0] except IndexError: - logger.warning('{path} not found in {root}'.format(path=path, root=root.getroottree().getpath(root))) + logger.debug('{path} not found in {root}'.format(path=path, root=root.getroottree().getpath(root))) return None if "@" in path: diff --git a/ESSArch_Core/workflow/tasks.py b/ESSArch_Core/workflow/tasks.py index d06fe4faf..a8dcc711b 100644 --- a/ESSArch_Core/workflow/tasks.py +++ b/ESSArch_Core/workflow/tasks.py @@ -253,7 +253,9 @@ def AccessAIP(self, aip, storage_object=None, tar=True, extracted=False, new=Fal aip.access(storage_object, self.get_processtask(), dst=dst) - self.create_success_event("Retrieved information package from storage to workspace") + msg = "Retrieved information package from storage {} to workspace".format(storage_object.storage_medium.medium_id) + self.create_success_event(msg) + return msg @app.task(bind=True, queue='robot', track=False) diff --git a/requirements/base.txt b/requirements/base.txt index 792138c74..3d84c4182 100644 --- a/requirements/base.txt +++ b/requirements/base.txt @@ -1,5 +1,5 @@ asgiref==3.8.1 -boto3==1.35.72 +boto3==1.35.78 celery[tblib]==5.4.0 cffi==1.17.1 channels==4.2.0 @@ -9,7 +9,7 @@ click==8.1.3 cryptography==44.0.0 daphne==4.1.2 dj-rest-auth[with-social]==7.0.0 -django==5.0.9 +django==5.0.10 django-allauth==0.61.1 django-cors-headers==4.6.0 django-countries-plus==2.2.0 @@ -46,7 +46,7 @@ opf-fido==1.6.1 pyfakefs==5.7.2 python-dateutil==2.8.2 pywin32==308 ; platform_system=='Windows' -redis==5.2.0 +redis==5.2.1 regex==2024.11.6 requests==2.32.3 requests-toolbelt==1.0.0 diff --git a/requirements/tests.txt b/requirements/tests.txt index bb49efc5f..5d700a2fb 100644 --- a/requirements/tests.txt +++ b/requirements/tests.txt @@ -1,3 +1,3 @@ -coverage==7.6.8 +coverage==7.6.9 django-test-without-migrations==0.6 selenium==4.25.0