diff --git a/misc/upparat_create_job.py b/misc/upparat_create_job.py index dd4c03d..a6e0be6 100755 --- a/misc/upparat_create_job.py +++ b/misc/upparat_create_job.py @@ -169,6 +169,9 @@ def main(arguments): print(f" → ARN IoT: {arguments.arn_iot}") print(f" → Things: {thing_names}") print(f" → Groups: {group_names}\n") + print(f"AWS IoT Job Document:\n") + print(json.dumps(json.loads(document), indent=2)) + print("") if not arguments.dry_run: create_job( diff --git a/src/upparat/events.py b/src/upparat/events.py index 1fd69b0..3b754d5 100644 --- a/src/upparat/events.py +++ b/src/upparat/events.py @@ -11,6 +11,7 @@ JOB_REVOKED = "job-revoked" SELECT_JOB_INTERRUPTED = "selected-job-interrupted" +SELECT_JOB_ACTION_MISMATCH = "selected-job-action-mismatch" DOWNLOAD_COMPLETED = "download-completed" DOWNLOAD_INTERRUPTED = "download-interrupted" diff --git a/src/upparat/jobs.py b/src/upparat/jobs.py index fa0813d..7e01167 100644 --- a/src/upparat/jobs.py +++ b/src/upparat/jobs.py @@ -4,6 +4,8 @@ from upparat.config import settings +UPPARAT_ACTION = "upparat-update" + # AWS job execution EXECUTION = "execution" EXECUTION_STATE = "executionState" @@ -14,6 +16,7 @@ # AWS job document JOB_ID = "jobId" JOB_DOCUMENT = "jobDocument" +JOB_DOCUMENT_ACTION = "action" JOB_DOCUMENT_FILE = "file" JOB_DOCUMENT_VERSION = "version" JOB_DOCUMENT_META = "meta" diff --git a/src/upparat/statemachine/machine.py b/src/upparat/statemachine/machine.py index d5532f2..7de5345 100644 --- a/src/upparat/statemachine/machine.py +++ b/src/upparat/statemachine/machine.py @@ -13,6 +13,7 @@ from upparat.events import JOBS_AVAILABLE from upparat.events import NO_JOBS_PENDING from upparat.events import RESTART_INTERRUPTED +from upparat.events import SELECT_JOB_ACTION_MISMATCH from upparat.events import SELECT_JOB_INTERRUPTED from upparat.statemachine import UpparatStateMachine from upparat.statemachine.download import DownloadState @@ -71,6 +72,10 @@ def create_statemachine(event_queue, mqtt_client): select_job_state, fetch_jobs_state, events=[SELECT_JOB_INTERRUPTED] ) + statemachine.add_transition( + select_job_state, monitor_state, events=[SELECT_JOB_ACTION_MISMATCH] + ) + # Job is ready for process statemachine.add_transition(verify_job_state, download_state, events=[JOB_VERIFIED]) diff --git a/src/upparat/statemachine/select_job.py b/src/upparat/statemachine/select_job.py index 742c6c2..699cae4 100644 --- a/src/upparat/statemachine/select_job.py +++ b/src/upparat/statemachine/select_job.py @@ -14,6 +14,7 @@ from upparat.events import MQTT_EVENT_TOPIC from upparat.events import MQTT_MESSAGE_RECEIVED from upparat.events import MQTT_SUBSCRIBED +from upparat.events import SELECT_JOB_ACTION_MISMATCH from upparat.events import SELECT_JOB_INTERRUPTED from upparat.jobs import describe_job_execution from upparat.jobs import describe_job_execution_response @@ -21,6 +22,7 @@ from upparat.jobs import Job from upparat.jobs import JOB_ACCEPTED from upparat.jobs import JOB_DOCUMENT +from upparat.jobs import JOB_DOCUMENT_ACTION from upparat.jobs import JOB_DOCUMENT_FILE from upparat.jobs import JOB_DOCUMENT_FORCE from upparat.jobs import JOB_DOCUMENT_META @@ -32,6 +34,7 @@ from upparat.jobs import JOB_STATUS_DETAILS from upparat.jobs import job_update_multiple_as_failed from upparat.jobs import JobProgressStatus +from upparat.jobs import UPPARAT_ACTION from upparat.statemachine import BaseState logger = logging.getLogger(__name__) @@ -128,6 +131,15 @@ def on_message(self, state, event): if topic_matches_sub(accepted_topic, topic): job_execution = payload[EXECUTION] job_document = job_execution[JOB_DOCUMENT] + job_document_action = job_document.get(JOB_DOCUMENT_ACTION) + + # something else can also publish jobs → make sure to only handle the ones for us + if job_document_action != UPPARAT_ACTION: + logger.info( + f"Job ignored: Job document does not match expected Upparat action field {UPPARAT_ACTION}." # noqa + ) + self.publish(Event(SELECT_JOB_ACTION_MISMATCH)) + return job = Job( id_=job_execution[JOB_ID], diff --git a/tests/statemachine/select_job_test.py b/tests/statemachine/select_job_test.py index 0d657a7..bb02458 100644 --- a/tests/statemachine/select_job_test.py +++ b/tests/statemachine/select_job_test.py @@ -9,6 +9,7 @@ from upparat.events import JOB_SELECTED from upparat.events import MQTT_MESSAGE_RECEIVED from upparat.events import MQTT_SUBSCRIBED +from upparat.events import SELECT_JOB_ACTION_MISMATCH from upparat.events import SELECT_JOB_INTERRUPTED from upparat.jobs import describe_job_execution_response from upparat.jobs import Job @@ -175,6 +176,33 @@ def test_on_message_rejected_job(select_job_state, create_mqtt_message_event): assert inbox.empty() +def test_on_message_not_from_upparat_job(select_job_state, create_mqtt_message_event): + state, inbox, _, __ = select_job_state + + state.current_job_id = "42" + settings.broker.thing_name = "bobby" + + event = create_mqtt_message_event( + describe_job_execution_response( + settings.broker.thing_name, state.current_job_id, state_filter=JOB_ACCEPTED + ), + { + "execution": { + "jobId": "42", + "status": JobStatus.IN_PROGRESS.value, + "statusDetails": "...", + "jobDocument": {"jobFromAnotherService": "notFromUpparat"}, + } + }, + ) + + state.on_message(None, event) + + published_event = inbox.get_nowait() + assert published_event.name == SELECT_JOB_ACTION_MISMATCH + assert inbox.empty() + + def test_on_message_accepted_job(select_job_state, create_mqtt_message_event): state, inbox, _, __ = select_job_state @@ -197,6 +225,7 @@ def test_on_message_accepted_job(select_job_state, create_mqtt_message_event): "status": status, "statusDetails": status_details, "jobDocument": { + "action": "upparat-update", "version": version, "file": file_url, "force": force, diff --git a/tests/statemachine/statemachine_transitions_test.py b/tests/statemachine/statemachine_transitions_test.py index 786c308..eec7d31 100644 --- a/tests/statemachine/statemachine_transitions_test.py +++ b/tests/statemachine/statemachine_transitions_test.py @@ -16,6 +16,7 @@ from upparat.events import JOBS_AVAILABLE from upparat.events import NO_JOBS_PENDING from upparat.events import RESTART_INTERRUPTED +from upparat.events import SELECT_JOB_ACTION_MISMATCH from upparat.events import SELECT_JOB_INTERRUPTED from upparat.statemachine.download import DownloadState from upparat.statemachine.fetch_jobs import FetchJobsState @@ -125,6 +126,13 @@ def test_fetch_jobs_pending_jobs_found(fetch_jobs_state): return statemachine, statemachine.state +def test_select_found_job_not_for_upparat(select_job_state): + statemachine, _ = select_job_state + statemachine.dispatch(Event(SELECT_JOB_ACTION_MISMATCH)) + assert isinstance(statemachine.state, MonitorState) + return statemachine, statemachine.state + + def test_select_found_job_to_processs(select_job_state): statemachine, _ = select_job_state statemachine.dispatch(Event(JOB_SELECTED))