From bac32f1edfd8b3c8bb4e7f8d8245c7e5f43b076b Mon Sep 17 00:00:00 2001 From: Livio Bieri Date: Wed, 29 Jul 2020 16:58:44 +0200 Subject: [PATCH] feat: ignore non-upparat AWS IoT Jobs previously, all AWS IoT Jobs would attempt to initiate an update process even if the payload was clearly not meant for Upparat. With this change, Upparat only handles jobs in whose job document the `action` field is set to `upparat-update`. I.e. a valid document would look like this: ``` { "action": "upparat-update", "file": "${aws:iot:s3-presigned-url:https://firmware.s3.amazonaws.com/bundle.raucb}", "version": "1.2.0" } ``` --- misc/upparat_create_job.py | 3 ++ src/upparat/events.py | 1 + src/upparat/jobs.py | 3 ++ src/upparat/statemachine/machine.py | 5 ++++ src/upparat/statemachine/select_job.py | 12 ++++++++ tests/statemachine/select_job_test.py | 29 +++++++++++++++++++ .../statemachine_transitions_test.py | 8 +++++ 7 files changed, 61 insertions(+) diff --git a/misc/upparat_create_job.py b/misc/upparat_create_job.py index dd4c03d..a6e0be6 100755 --- a/misc/upparat_create_job.py +++ b/misc/upparat_create_job.py @@ -169,6 +169,9 @@ def main(arguments): print(f" → ARN IoT: {arguments.arn_iot}") print(f" → Things: {thing_names}") print(f" → Groups: {group_names}\n") + print(f"AWS IoT Job Document:\n") + print(json.dumps(json.loads(document), indent=2)) + print("") if not arguments.dry_run: create_job( diff --git a/src/upparat/events.py b/src/upparat/events.py index 1fd69b0..3b754d5 100644 --- a/src/upparat/events.py +++ b/src/upparat/events.py @@ -11,6 +11,7 @@ JOB_REVOKED = "job-revoked" SELECT_JOB_INTERRUPTED = "selected-job-interrupted" +SELECT_JOB_ACTION_MISMATCH = "selected-job-action-mismatch" DOWNLOAD_COMPLETED = "download-completed" DOWNLOAD_INTERRUPTED = "download-interrupted" diff --git a/src/upparat/jobs.py b/src/upparat/jobs.py index fa0813d..7e01167 100644 --- a/src/upparat/jobs.py +++ b/src/upparat/jobs.py @@ -4,6 +4,8 @@ from upparat.config import settings +UPPARAT_ACTION = "upparat-update" + # AWS job execution EXECUTION = "execution" EXECUTION_STATE = "executionState" @@ -14,6 +16,7 @@ # AWS job document JOB_ID = "jobId" JOB_DOCUMENT = "jobDocument" +JOB_DOCUMENT_ACTION = "action" JOB_DOCUMENT_FILE = "file" JOB_DOCUMENT_VERSION = "version" JOB_DOCUMENT_META = "meta" diff --git a/src/upparat/statemachine/machine.py b/src/upparat/statemachine/machine.py index d5532f2..7de5345 100644 --- a/src/upparat/statemachine/machine.py +++ b/src/upparat/statemachine/machine.py @@ -13,6 +13,7 @@ from upparat.events import JOBS_AVAILABLE from upparat.events import NO_JOBS_PENDING from upparat.events import RESTART_INTERRUPTED +from upparat.events import SELECT_JOB_ACTION_MISMATCH from upparat.events import SELECT_JOB_INTERRUPTED from upparat.statemachine import UpparatStateMachine from upparat.statemachine.download import DownloadState @@ -71,6 +72,10 @@ def create_statemachine(event_queue, mqtt_client): select_job_state, fetch_jobs_state, events=[SELECT_JOB_INTERRUPTED] ) + statemachine.add_transition( + select_job_state, monitor_state, events=[SELECT_JOB_ACTION_MISMATCH] + ) + # Job is ready for process statemachine.add_transition(verify_job_state, download_state, events=[JOB_VERIFIED]) diff --git a/src/upparat/statemachine/select_job.py b/src/upparat/statemachine/select_job.py index 742c6c2..699cae4 100644 --- a/src/upparat/statemachine/select_job.py +++ b/src/upparat/statemachine/select_job.py @@ -14,6 +14,7 @@ from upparat.events import MQTT_EVENT_TOPIC from upparat.events import MQTT_MESSAGE_RECEIVED from upparat.events import MQTT_SUBSCRIBED +from upparat.events import SELECT_JOB_ACTION_MISMATCH from upparat.events import SELECT_JOB_INTERRUPTED from upparat.jobs import describe_job_execution from upparat.jobs import describe_job_execution_response @@ -21,6 +22,7 @@ from upparat.jobs import Job from upparat.jobs import JOB_ACCEPTED from upparat.jobs import JOB_DOCUMENT +from upparat.jobs import JOB_DOCUMENT_ACTION from upparat.jobs import JOB_DOCUMENT_FILE from upparat.jobs import JOB_DOCUMENT_FORCE from upparat.jobs import JOB_DOCUMENT_META @@ -32,6 +34,7 @@ from upparat.jobs import JOB_STATUS_DETAILS from upparat.jobs import job_update_multiple_as_failed from upparat.jobs import JobProgressStatus +from upparat.jobs import UPPARAT_ACTION from upparat.statemachine import BaseState logger = logging.getLogger(__name__) @@ -128,6 +131,15 @@ def on_message(self, state, event): if topic_matches_sub(accepted_topic, topic): job_execution = payload[EXECUTION] job_document = job_execution[JOB_DOCUMENT] + job_document_action = job_document.get(JOB_DOCUMENT_ACTION) + + # something else can also publish jobs → make sure to only handle the ones for us + if job_document_action != UPPARAT_ACTION: + logger.info( + f"Job ignored: Job document does not match expected Upparat action field {UPPARAT_ACTION}." # noqa + ) + self.publish(Event(SELECT_JOB_ACTION_MISMATCH)) + return job = Job( id_=job_execution[JOB_ID], diff --git a/tests/statemachine/select_job_test.py b/tests/statemachine/select_job_test.py index 0d657a7..bb02458 100644 --- a/tests/statemachine/select_job_test.py +++ b/tests/statemachine/select_job_test.py @@ -9,6 +9,7 @@ from upparat.events import JOB_SELECTED from upparat.events import MQTT_MESSAGE_RECEIVED from upparat.events import MQTT_SUBSCRIBED +from upparat.events import SELECT_JOB_ACTION_MISMATCH from upparat.events import SELECT_JOB_INTERRUPTED from upparat.jobs import describe_job_execution_response from upparat.jobs import Job @@ -175,6 +176,33 @@ def test_on_message_rejected_job(select_job_state, create_mqtt_message_event): assert inbox.empty() +def test_on_message_not_from_upparat_job(select_job_state, create_mqtt_message_event): + state, inbox, _, __ = select_job_state + + state.current_job_id = "42" + settings.broker.thing_name = "bobby" + + event = create_mqtt_message_event( + describe_job_execution_response( + settings.broker.thing_name, state.current_job_id, state_filter=JOB_ACCEPTED + ), + { + "execution": { + "jobId": "42", + "status": JobStatus.IN_PROGRESS.value, + "statusDetails": "...", + "jobDocument": {"jobFromAnotherService": "notFromUpparat"}, + } + }, + ) + + state.on_message(None, event) + + published_event = inbox.get_nowait() + assert published_event.name == SELECT_JOB_ACTION_MISMATCH + assert inbox.empty() + + def test_on_message_accepted_job(select_job_state, create_mqtt_message_event): state, inbox, _, __ = select_job_state @@ -197,6 +225,7 @@ def test_on_message_accepted_job(select_job_state, create_mqtt_message_event): "status": status, "statusDetails": status_details, "jobDocument": { + "action": "upparat-update", "version": version, "file": file_url, "force": force, diff --git a/tests/statemachine/statemachine_transitions_test.py b/tests/statemachine/statemachine_transitions_test.py index 786c308..eec7d31 100644 --- a/tests/statemachine/statemachine_transitions_test.py +++ b/tests/statemachine/statemachine_transitions_test.py @@ -16,6 +16,7 @@ from upparat.events import JOBS_AVAILABLE from upparat.events import NO_JOBS_PENDING from upparat.events import RESTART_INTERRUPTED +from upparat.events import SELECT_JOB_ACTION_MISMATCH from upparat.events import SELECT_JOB_INTERRUPTED from upparat.statemachine.download import DownloadState from upparat.statemachine.fetch_jobs import FetchJobsState @@ -125,6 +126,13 @@ def test_fetch_jobs_pending_jobs_found(fetch_jobs_state): return statemachine, statemachine.state +def test_select_found_job_not_for_upparat(select_job_state): + statemachine, _ = select_job_state + statemachine.dispatch(Event(SELECT_JOB_ACTION_MISMATCH)) + assert isinstance(statemachine.state, MonitorState) + return statemachine, statemachine.state + + def test_select_found_job_to_processs(select_job_state): statemachine, _ = select_job_state statemachine.dispatch(Event(JOB_SELECTED))