Skip to content

Commit

Permalink
workflows: fix inspirehep#1944 and inspirehep#1940
Browse files Browse the repository at this point in the history
  • Loading branch information
pazembrz committed Jun 14, 2021
1 parent 807c31e commit 8b5aebe
Show file tree
Hide file tree
Showing 9 changed files with 214 additions and 12 deletions.
2 changes: 1 addition & 1 deletion inspirehep/modules/workflows/tasks/actions.py
Original file line number Diff line number Diff line change
Expand Up @@ -1037,7 +1037,7 @@ def create_core_selection_wf(obj, eng):
record_control_number = obj.data.get('control_number')
if not record_control_number:
raise MissingRecordControlNumber
if is_core(obj, eng) or not _is_auto_approved(obj) or core_selection_wf_already_created(record_control_number):
if is_core(obj, eng) or not _is_auto_approved(obj) or core_selection_wf_already_created(record_control_number) or check_mark(obj, 'is-update'):
LOGGER.info("No core selection needed for %s workflow with record %s", obj.id, record_control_number)
return obj

Expand Down
8 changes: 5 additions & 3 deletions inspirehep/modules/workflows/tasks/matching.py
Original file line number Diff line number Diff line change
Expand Up @@ -227,14 +227,16 @@ def set_wf_not_completed_ids_to_wf(obj, skip_blocked=True, skip_halted=False):
skip_halted: boolean, if True, then it skips HALTED workflows when
looking for matched workflows
"""
def _accept_only_article_wf(base_record, match_result):
return get_value(match_result, '_source._workflow.workflow_class') == "article"

def _non_completed(base_record, match_result):
return get_value(match_result,
'_source._workflow.status') != 'COMPLETED'
return get_value(match_result, '_source._workflow.status') != 'COMPLETED' \
and _accept_only_article_wf(base_record, match_result)

def _not_completed_or_halted(base_record, match_result):
return get_value(match_result, '_source._workflow.status') not in [
'COMPLETED', 'HALTED']
'COMPLETED', 'HALTED'] and _accept_only_article_wf(base_record, match_result)

def is_workflow_blocked_by_another_workflow(workflow_id):
workflow = workflow_object_class.get(workflow_id)
Expand Down
6 changes: 6 additions & 0 deletions tests/integration/tasks/test_matching.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ def test_pending_holdingpen_matches_wf_if_not_completed(app, simple_record):
)
obj_id = obj.id
obj.save()
start("article", object_id=obj_id)
current_search.flush_and_refresh('holdingpen-hep')

obj2 = WorkflowObject.create(data_type='hep', **simple_record)
Expand All @@ -103,7 +104,11 @@ def test_match_previously_rejected_wf_in_holdingpen(app, simple_record):
**simple_record
)
obj_id = obj.id
obj.save()
start("article", object_id=obj_id)
obj = workflow_object_class.get(obj_id)
obj.extra_data['approved'] = False # reject it
obj.status = ObjectStatus.COMPLETED
obj.save()
current_search.flush_and_refresh('holdingpen-hep')

Expand All @@ -128,6 +133,7 @@ def test_has_same_source(app, simple_record):
)
obj_id = obj.id
obj.save()
start("article", object_id=obj_id)
current_search.flush_and_refresh('holdingpen-hep')

obj2 = WorkflowObject.create(data_type='hep', **simple_record)
Expand Down
Binary file not shown.
Binary file not shown.
71 changes: 71 additions & 0 deletions tests/integration/workflows/test_article_workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,11 @@

from __future__ import absolute_import, division, print_function

import os

import mock
import pkg_resources
import requests_mock

from invenio_workflows import (
start,
Expand All @@ -34,6 +38,7 @@


from inspirehep.modules.workflows.tasks.actions import mark
from inspirehep.modules.workflows.tasks.matching import set_wf_not_completed_ids_to_wf

PUBLISHING_RECORD = {
'$schema': 'https://labs.inspirehep.net/schemas/records/hep.json',
Expand Down Expand Up @@ -279,3 +284,69 @@ def test_keywords_are_stored_in_record_when_record_is_core(mocked_robotupload, m
mark('core', True)(workflow, None)
wf.continue_workflow()
assert wf.data['keywords'] == expected_keywords


@mock.patch("inspirehep.modules.workflows.tasks.beard.json_api_request", return_value={})
@mock.patch("inspirehep.modules.workflows.tasks.magpie.json_api_request", return_value={})
@mock.patch('inspirehep.modules.workflows.tasks.upload.store_record')
@mock.patch('inspirehep.modules.workflows.tasks.submission.submit_rt_ticket', return_value="1234")
@mock.patch('inspirehep.modules.workflows.tasks.submission.send_robotupload')
def test_run_next_wf_is_not_starting_core_selection_wfs(mocked_robotupload, mocked_create_ticket, mocked_store_record, mocked_magpie, mocked_beard, mocked_external_services, workflow_app):
record = {
'$schema': 'https://labs.inspirehep.net/schemas/records/hep.json',
'titles': [
{
'title': 'Title.'
},
],
"authors": [
{
"full_name": "Some author",
}
],
'document_type': ['article'],
'_collections': ['Literature'],
'arxiv_eprints': [{'value': "1802.08709.pdf"}, ],
'control_number': 1234,
"acquisition_source": {
"datetime": "2021-06-11T06:59:01.928752",
"method": "hepcrawl",
"source": "arXiv",
},
}

workflow = build_workflow(record, extra_data={'delay': 10})
with requests_mock.Mocker() as requests_mocker:
requests_mocker.register_uri(
"GET", 'http://export.arxiv.org/pdf/1802.08709.pdf',
content=pkg_resources.resource_string(
__name__, os.path.join('fixtures', '1802.08709.pdf')
)
)
requests_mocker.register_uri("GET", "http://arxiv.org/pdf/1802.08709.pdf", text="")
requests_mocker.register_uri(
"GET", "http://export.arxiv.org/e-print/1802.08709.pdf",
content=pkg_resources.resource_string(
__name__, os.path.join('fixtures', '1802.08709.pdf')
)
)
requests_mocker.register_uri("POST", "http://grobid_url.local/api/processHeaderDocument")
start("article", object_id=workflow.id)

wf = workflow_object_class.get(workflow.id)
mark('auto-approved', True)(workflow, None)
wf.callback_pos = [34, 1, 13]
wf.continue_workflow()
workflow = build_workflow(record)
with requests_mock.Mocker() as requests_mocker:
requests_mocker.register_uri("GET", 'http://export.arxiv.org/pdf/1802.08709.pdf',
content=pkg_resources.resource_string(
__name__, os.path.join('fixtures', '1802.08709.pdf')), )
requests_mocker.register_uri("GET", "http://arxiv.org/pdf/1802.08709.pdf", text="")
requests_mocker.register_uri("GET", "http://export.arxiv.org/e-print/1802.08709.pdf",
content=pkg_resources.resource_string(
__name__, os.path.join('fixtures', '1802.08709.pdf')), )
requests_mocker.register_uri("POST", "http://grobid_url.local/api/processHeaderDocument")
start("article", object_id=workflow.id)
matched = set_wf_not_completed_ids_to_wf(workflow)
assert matched == []
62 changes: 58 additions & 4 deletions tests/integration/workflows/test_arxiv_workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -802,10 +802,37 @@ def test_previously_rejected_from_not_fully_harvested_category_is_not_auto_appro
assert obj2.status == ObjectStatus.COMPLETED


def test_match_wf_in_error_goes_in_error_state(workflow_app):
@mock.patch(
"inspirehep.modules.workflows.tasks.arxiv.download_file_to_workflow",
side_effect=fake_download_file,
)
@mock.patch(
"inspirehep.modules.workflows.tasks.actions.download_file_to_workflow",
side_effect=fake_download_file,
)
@mock.patch("inspirehep.modules.workflows.tasks.arxiv.is_pdf_link", return_value=True)
@mock.patch(
"inspirehep.modules.workflows.tasks.beard.json_api_request",
side_effect=fake_beard_api_request,
)
@mock.patch(
"inspirehep.modules.workflows.tasks.magpie.json_api_request",
side_effect=fake_magpie_api_request,
)
def test_match_wf_in_error_goes_in_error_state(
mocked_api_request_magpie,
mocked_api_request_beard,
mocked_is_pdf_link,
mocked_package_download,
mocked_arxiv_download,
workflow_app,
mocked_external_services
):
record = generate_record()

obj = workflow_object_class.create(data=record, data_type="hep")
wf_id = build_workflow(record).id
start("article", object_id=wf_id)
obj = workflow_object_class.get(wf_id)
obj.status = ObjectStatus.ERROR
obj.save()
current_search.flush_and_refresh("holdingpen-hep")
Expand All @@ -815,10 +842,37 @@ def test_match_wf_in_error_goes_in_error_state(workflow_app):
start("article", object_id=workflow_id)


def test_match_wf_in_error_goes_in_initial_state(workflow_app):
@mock.patch(
"inspirehep.modules.workflows.tasks.arxiv.download_file_to_workflow",
side_effect=fake_download_file,
)
@mock.patch(
"inspirehep.modules.workflows.tasks.actions.download_file_to_workflow",
side_effect=fake_download_file,
)
@mock.patch("inspirehep.modules.workflows.tasks.arxiv.is_pdf_link", return_value=True)
@mock.patch(
"inspirehep.modules.workflows.tasks.beard.json_api_request",
side_effect=fake_beard_api_request,
)
@mock.patch(
"inspirehep.modules.workflows.tasks.magpie.json_api_request",
side_effect=fake_magpie_api_request,
)
def test_match_wf_in_error_goes_in_initial_state(
mocked_api_request_magpie,
mocked_api_request_beard,
mocked_is_pdf_link,
mocked_package_download,
mocked_arxiv_download,
workflow_app,
mocked_external_services
):
record = generate_record()

obj = workflow_object_class.create(data=record, data_type="hep")
wf_id = build_workflow(record).id
start("article", object_id=wf_id)
obj = workflow_object_class.get(wf_id)
obj.status = ObjectStatus.INITIAL
obj.save()
current_search.flush_and_refresh("holdingpen-hep")
Expand Down
62 changes: 62 additions & 0 deletions tests/integration/workflows/test_workflow_core_selection.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,68 @@ def test_core_selection_wf_starts_after_article_wf_when_no_core(mocked_api_reque
assert mock.request_history[1].json() == expected_record_data


@mock.patch('inspirehep.modules.workflows.tasks.submission.send_robotupload')
@mock.patch('inspirehep.modules.workflows.tasks.submission.submit_rt_ticket', return_value="1234")
@mock.patch(
"inspirehep.modules.workflows.tasks.beard.json_api_request",
side_effect=fake_beard_api_request,
)
@mock.patch(
"inspirehep.modules.workflows.tasks.magpie.json_api_request",
side_effect=fake_magpie_api_request,
)
def test_core_selection_wf_is_not_created_when_wf_is_record_update(mocked_api_request_magpie, mocked_api_request_beard, mocked_rt, mocked_send_robotupload, workflow_app, mocked_external_services):
pid_value = 123456
mocked_url = "{inspirehep_url}/{endpoint}/{control_number}".format(
inspirehep_url=current_app.config.get("INSPIREHEP_URL"),
endpoint='literature',
control_number=pid_value
)
record = {
"_collections": [
"Literature"
],
"titles": [
{"title": "A title"},
],
"document_type": [
"report"
],
"collaborations": [
{"value": "SHIP"}
],
"control_number": pid_value,
}

workflow_object = workflow_object_class.create(
data=record,
id_user=None,
data_type='hep'
)
workflow_object.extra_data['source_data'] = {"data": record, "extra_data": {"source_data": {"data": record}}}
workflow_object.save()

with override_config(FEATURE_FLAG_ENABLE_REST_RECORD_MANAGEMENT=True):
with requests_mock.Mocker() as mock:
mock.register_uri('GET', mocked_url, json=load_json_record('hep_record_no_core.json'))
mock.register_uri('PUT', "http://web:8000/literature/{control_number}".format(control_number=pid_value), json={"metadata": {"control_number": pid_value}})

start("article", object_id=workflow_object.id)

assert WorkflowObjectModel.query.filter(WorkflowObjectModel.workflow.has(name="core_selection")).count() == 0

workflow_object.callback_pos = [34, 1, 13]
# Run task for creating core_selection wf
workflow_object.extra_data['auto-approved'] = True
workflow_object.extra_data['is-update'] = True
workflow_object.save()

workflow_object.continue_workflow('restart_task')

assert WorkflowObjectModel.query.filter(WorkflowObjectModel.workflow.has(name="core_selection")).count() == 0
assert workflow_object.status == ObjectStatus.COMPLETED


@mock.patch('inspirehep.modules.workflows.tasks.submission.send_robotupload')
@mock.patch('inspirehep.modules.workflows.tasks.submission.submit_rt_ticket', return_value="1234")
@mock.patch(
Expand Down
15 changes: 11 additions & 4 deletions tests/integration_async/test_workflows.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@

from __future__ import absolute_import, division, print_function

import uuid

import time
import pytest

Expand All @@ -31,7 +33,7 @@

from invenio_db import db
from invenio_search import current_search
from invenio_workflows import ObjectStatus, start, workflow_object_class
from invenio_workflows import ObjectStatus, start, workflow_object_class, Workflow


def build_workflow(workflow_data, data_type='hep', **kwargs):
Expand All @@ -46,6 +48,11 @@ def build_workflow(workflow_data, data_type='hep', **kwargs):
},
**kwargs
)
workflow_object.save()
wf = Workflow(name='article', extra_data=workflow_object.extra_data, uuid=uuid.uuid4())
wf.save()
workflow_object.id_workflow = wf.uuid
workflow_object.save()
return workflow_object


Expand All @@ -59,7 +66,7 @@ def check_wf_state(workflow_id, desired_status, max_time=550): # Travis fails a
"""
start = datetime.now()
end = start + timedelta(seconds=max_time)
while (True):
while True:
db.session.close()
if workflow_object_class.get(workflow_id).status == desired_status:
return
Expand Down Expand Up @@ -212,10 +219,10 @@ def test_wf_not_stops_when_blocking_another_one_after_restarted_on_init(
check_wf_state(wf2_id, ObjectStatus.INITIAL)
check_wf_state(wf3_id, ObjectStatus.INITIAL)

start.delay('article', object_id=wf3_id)

current_search.flush_and_refresh('holdingpen-hep')

start.delay('article', object_id=wf3_id)

check_wf_state(wf1_id, ObjectStatus.INITIAL)
check_wf_state(wf2_id, ObjectStatus.INITIAL)
check_wf_state(wf3_id, ObjectStatus.ERROR)
Expand Down

0 comments on commit 8b5aebe

Please sign in to comment.