diff --git a/cdci_data_analysis/pytest_fixtures.py b/cdci_data_analysis/pytest_fixtures.py index 59d0bfbb8..9a5d26a90 100644 --- a/cdci_data_analysis/pytest_fixtures.py +++ b/cdci_data_analysis/pytest_fixtures.py @@ -577,6 +577,11 @@ def dispatcher_test_conf_with_matrix_options_fn(dispatcher_test_conf_fn): yield fn +@pytest.fixture +def dispatcher_no_cc_matrix_room_ids(monkeypatch): + monkeypatch.delenv('MATRIX_CC_RECEIVER_ROOM_ID', raising=False) + + @pytest.fixture def dispatcher_test_conf_with_gallery_no_resolver_fn(dispatcher_test_conf_fn): fn = "test-dispatcher-conf-with-gallery.yaml" diff --git a/tests/conftest.py b/tests/conftest.py index be7962151..846829e46 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -14,6 +14,7 @@ dispatcher_long_living_fixture, gunicorn_dispatcher_long_living_fixture, dispatcher_long_living_fixture_with_matrix_options, + dispatcher_no_cc_matrix_room_ids, gunicorn_dispatcher_long_living_fixture_with_matrix_options, dispatcher_test_conf, dispatcher_test_conf_with_gallery, diff --git a/tests/test_matrix_messages.py b/tests/test_matrix_messages.py index 9626893f1..20d64fcdd 100644 --- a/tests/test_matrix_messages.py +++ b/tests/test_matrix_messages.py @@ -1202,3 +1202,343 @@ def test_incident_report(dispatcher_live_fixture_with_matrix_options, ) +@pytest.mark.test_matrix +@pytest.mark.parametrize("default_values", [True, False]) +@pytest.mark.parametrize("time_original_request_none", [False]) +@pytest.mark.parametrize("request_cred", ['public', 'private', 'private-no-matrix-message', 'private-no-room-id']) +def test_matrix_message_run_analysis_callback_no_room_ids(dispatcher_no_cc_matrix_room_ids, + gunicorn_dispatcher_long_living_fixture_with_matrix_options, + dispatcher_test_conf_with_matrix_options, + dispatcher_local_matrix_message_server, + default_values, request_cred, time_original_request_none): + DataServerQuery.set_status('submitted') + + server = gunicorn_dispatcher_long_living_fixture_with_matrix_options + + DispatcherJobState.remove_scratch_folders() + + token_none = (request_cred == 'public') + + expect_matrix_message = True + token_payload = {**default_token_payload, + "tmx": 0, + "tem": 0, + "mxstout": True, + "mxintsub": 5, + "mxsub": True, + "mssub": False, + "msdone": False, + "msfail": False, + "mxroomid": dispatcher_local_matrix_message_server.room_id + } + + if token_none: + encoded_token = None + else: + # let's generate a valid token with high threshold + + if default_values: + token_payload.pop('tmx') + token_payload.pop('mxstout') + token_payload.pop('mxsub') + token_payload.pop('mxintsub') + + if request_cred == 'private-no-matrix-message': + token_payload['mxsub'] = False + token_payload['mxdone'] = False + token_payload['mxfail'] = False + expect_matrix_message = False + elif request_cred == 'private-no-room-id': + token_payload.pop('mxroomid', None) + + encoded_token = jwt.encode(token_payload, secret_key, algorithm='HS256') + + dict_param = dict( + query_status="new", + query_type="Real", + instrument="empty-async", + product_type="dummy", + token=encoded_token + ) + + # this should return status submitted, so matrix message sent + c = requests.get(os.path.join(server, "run_analysis"), + dict_param + ) + assert c.status_code == 200 + jdata = c.json() + + session_id = jdata['session_id'] + job_id = jdata['job_monitor']['job_id'] + + logger.info("response from run_analysis: %s", json.dumps(jdata, indent=4)) + dispatcher_job_state = DispatcherJobState.from_run_analysis_response(c.json()) + + assert jdata['query_status'] == "submitted" + + completed_dict_param = {**dict_param, + 'use_scws': 'no', + 'src_name': '1E 1740.7-2942', + 'RA': 265.97845833, + 'DEC': -29.74516667, + 'T1': '2017-03-06T13:26:48.000', + 'T2': '2017-03-06T15:32:27.000', + 'T_format': 'isot' + } + + products_url = DispatcherJobState.get_expected_products_url(completed_dict_param, + token=encoded_token, + session_id=session_id, + job_id=job_id) + assert jdata['exit_status']['job_status'] == 'submitted' + # get the original time the request was made + assert 'time_request' in jdata + # set the time the request was initiated + time_request = jdata['time_request'] + time_request_str = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(float(time_request))) + + if token_none or not expect_matrix_message: + # matrix message not supposed to be sent for public request + assert 'matrix_message_status' not in jdata + else: + + assert 'matrix_message_status' in jdata['exit_status'] + assert jdata['exit_status']['matrix_message_status'] == 'matrix message sent' + assert 'matrix_message_status_details' in jdata['exit_status'] + matrix_message_status_details_obj = json.loads(jdata['exit_status']['matrix_message_status_details']) + assert 'res_content' in matrix_message_status_details_obj + if request_cred == 'private-no-room-id': + assert 'res_content_token_user' not in matrix_message_status_details_obj['res_content'] + else: + assert 'res_content_token_user' in matrix_message_status_details_obj['res_content'] + matrix_user_message_event_id = \ + matrix_message_status_details_obj['res_content']['res_content_token_user']['event_id'] + + validate_matrix_message_content( + dispatcher_local_matrix_message_server.get_matrix_message_record( + room_id=token_payload['mxroomid'], + event_id=matrix_user_message_event_id), + 'submitted', + room_id=token_payload['mxroomid'], + event_id=matrix_user_message_event_id, + user_id=token_payload['user_id'], + dispatcher_job_state=dispatcher_job_state, + variation_suffixes=["dummy"], + time_request_str=time_request_str, + request_params=dict_param, + products_url=products_url, + dispatcher_live_fixture=None, + require_reference_matrix_message=True + ) + + assert 'res_content_cc_users' in matrix_message_status_details_obj['res_content'] + assert isinstance(matrix_message_status_details_obj['res_content']['res_content_cc_users'], list) + assert len(matrix_message_status_details_obj['res_content']['res_content_cc_users']) == 0 + + # for the call_back(s) in case the time of the original request is not provided + if time_original_request_none: + time_request = None + time_request_str = 'None' + + for i in range(5): + # imitating what a backend would do + current_action = 'progress' if i > 2 else 'main_done' + c = requests.get(os.path.join(server, "call_back"), + params=dict( + job_id=dispatcher_job_state.job_id, + session_id=dispatcher_job_state.session_id, + instrument_name="empty-async", + action=current_action, + node_id=f'node_{i}', + message='progressing', + token=encoded_token, + time_original_request=time_request + )) + assert dispatcher_job_state.load_job_state_record(f'node_{i}', "progressing")['full_report_dict'][ + 'action'] == current_action + + c = requests.get(os.path.join(server, "run_analysis"), + params=dict( + query_status="submitted", # whether query is new or not, this should work + query_type="Real", + instrument="empty-async", + product_type="dummy", + async_dispatcher=False, + session_id=dispatcher_job_state.session_id, + job_id=dispatcher_job_state.job_id, + token=encoded_token + )) + assert c.json()['query_status'] == 'progress' # always progress! + + # we should now find progress records + c = requests.get(os.path.join(server, "run_analysis"), + {**dict_param, + "query_status": "submitted", + "job_id": job_id, + "session_id": session_id, + } + ) + + assert c.status_code == 200 + jdata = c.json() + + assert len(jdata['job_monitor']['full_report_dict_list']) == 6 + assert [c['action'] for c in jdata['job_monitor']['full_report_dict_list']] == [ + 'main_done', 'main_done', 'main_done', 'progress', 'progress', 'progress'] + + c = requests.get(os.path.join(server, "call_back"), + params=dict( + job_id=dispatcher_job_state.job_id, + session_id=dispatcher_job_state.session_id, + instrument_name="empty-async", + action='main_incorrect_status', + node_id=f'node_{i + 1}', + message='progressing', + token=encoded_token, + time_original_request=time_request + )) + assert c.status_code == 200 + + c = requests.get(os.path.join(server, "run_analysis"), + { + **dict_param, + "query_status": "submitted", + "job_id": job_id, + "session_id": session_id, + } + ) + assert c.status_code == 200 + assert c.json()['query_status'] == 'progress' + + # this does nothing special + c = requests.get(os.path.join(server, "call_back"), + params=dict( + job_id=dispatcher_job_state.job_id, + session_id=dispatcher_job_state.session_id, + instrument_name="empty-async", + action='ready', + node_id='node_ready', + message='ready', + token=encoded_token, + time_original_request=time_request + )) + + DataServerQuery.set_status('done') + + # this triggers a message via matrix + c = requests.get(os.path.join(server, "call_back"), + params=dict( + job_id=dispatcher_job_state.job_id, + session_id=dispatcher_job_state.session_id, + instrument_name="empty-async", + action='done', + node_id='node_final', + message='done', + token=encoded_token, + time_original_request=time_request + )) + + assert c.status_code == 200 + + jdata = dispatcher_job_state.load_job_state_record('node_final', 'done') + + if token_none or not expect_matrix_message: + assert 'matrix_message_status' not in jdata + + elif time_original_request_none: + assert 'matrix_message_status' in jdata + + elif default_values: + assert 'matrix_message_status' not in jdata + + else: + assert 'matrix_message_status' in jdata + assert jdata['matrix_message_status'] == 'matrix message sent' + assert 'matrix_message_status_details' in jdata + matrix_message_status_details_obj = json.loads(jdata['matrix_message_status_details']) + assert 'res_content' in matrix_message_status_details_obj + assert 'res_content_cc_users' in matrix_message_status_details_obj['res_content'] + assert isinstance(matrix_message_status_details_obj['res_content']['res_content_cc_users'], list) + assert len(matrix_message_status_details_obj['res_content']['res_content_cc_users']) == 0 + + if request_cred == 'private-no-room-id': + assert 'res_content_token_user' not in matrix_message_status_details_obj['res_content'] + else: + assert 'res_content_token_user' in matrix_message_status_details_obj['res_content'] + assert 'event_id' in matrix_message_status_details_obj['res_content']['res_content_token_user'] + matrix_user_message_event_id = \ + matrix_message_status_details_obj['res_content']['res_content_token_user']['event_id'] + + validate_matrix_message_content( + dispatcher_local_matrix_message_server.get_matrix_message_record( + room_id=token_payload['mxroomid'], + event_id=matrix_user_message_event_id), + 'done', + room_id=token_payload['mxroomid'], + event_id=matrix_user_message_event_id, + user_id=token_payload['user_id'], + dispatcher_job_state=dispatcher_job_state, + time_request_str=time_request_str, + dispatcher_live_fixture=server, + require_reference_matrix_message=True + ) + + # check the matrix message in the matrix message folders, and that the first one was produced + dispatcher_job_state.assert_matrix_message(state="done") + + # this also triggers matrix message (simulate a failed request) + c = requests.get(os.path.join(server, "call_back"), + params={ + 'job_id': dispatcher_job_state.job_id, + 'session_id': dispatcher_job_state.session_id, + 'instrument_name': "empty-async", + 'action': 'failed', + 'node_id': 'node_failed', + 'message': 'failed', + 'token': encoded_token, + 'time_original_request': time_request + }) + + assert c.status_code == 200 + + jdata = dispatcher_job_state.load_job_state_record('node_failed', 'failed') + + if token_none or not expect_matrix_message: + # matrix message not supposed to be sent for public request + assert 'matrix_message_status' not in jdata + assert 'matrix_message_status_details' not in jdata + else: + assert 'matrix_message_status' in jdata + assert jdata['matrix_message_status'] == 'matrix message sent' + assert 'matrix_message_status_details' in jdata + matrix_message_status_details_obj = json.loads(jdata['matrix_message_status_details']) + assert 'res_content' in matrix_message_status_details_obj + assert 'res_content_cc_users' in matrix_message_status_details_obj['res_content'] + assert isinstance(matrix_message_status_details_obj['res_content']['res_content_cc_users'], list) + assert len(matrix_message_status_details_obj['res_content']['res_content_cc_users']) == 0 + if request_cred == 'private-no-room-id': + assert 'res_content_token_user' not in matrix_message_status_details_obj['res_content'] + else: + assert 'res_content_token_user' in matrix_message_status_details_obj['res_content'] + assert 'event_id' in matrix_message_status_details_obj['res_content']['res_content_token_user'] + + matrix_message_event_id_obj = matrix_message_status_details_obj['res_content']['res_content_token_user']['event_id'] + + # check the matrix message in the matrix message folders, and that the first one was produced + if default_values or time_original_request_none: + dispatcher_job_state.assert_matrix_message('failed', comment="expected one matrix message in total, failed") + else: + dispatcher_job_state.assert_matrix_message('failed', comment="expected two matrix message in total, second failed") + + validate_matrix_message_content( + dispatcher_local_matrix_message_server.get_matrix_message_record(room_id=token_payload['mxroomid'], + event_id=matrix_message_event_id_obj), + 'failed', + room_id=token_payload['mxroomid'], + event_id=matrix_message_event_id_obj, + user_id=token_payload['user_id'], + dispatcher_job_state=dispatcher_job_state, + time_request_str=time_request_str, + dispatcher_live_fixture=server, + require_reference_matrix_message=True + )