Skip to content

Commit

Permalink
feat: ignore non-upparat AWS IoT Jobs
Browse files Browse the repository at this point in the history
previously, all AWS IoT Jobs would attempt to initiate an update
process even if the payload was clearly not meant for Upparat.

With this change, Upparat only handles jobs in whose job document
the `action` field is set to `upparat-update`.

I.e. a valid document would look like this:

```
{
  "action": "upparat-update",
  "file": "${aws:iot:s3-presigned-url:https://firmware.s3.amazonaws.com/bundle.raucb}",
  "version": "1.2.0"
}
```
  • Loading branch information
livioso committed Jul 30, 2020
1 parent 393540a commit bac32f1
Show file tree
Hide file tree
Showing 7 changed files with 61 additions and 0 deletions.
3 changes: 3 additions & 0 deletions misc/upparat_create_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,9 @@ def main(arguments):
print(f" → ARN IoT: {arguments.arn_iot}")
print(f" → Things: {thing_names}")
print(f" → Groups: {group_names}\n")
print(f"AWS IoT Job Document:\n")
print(json.dumps(json.loads(document), indent=2))
print("")

if not arguments.dry_run:
create_job(
Expand Down
1 change: 1 addition & 0 deletions src/upparat/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
JOB_REVOKED = "job-revoked"

SELECT_JOB_INTERRUPTED = "selected-job-interrupted"
SELECT_JOB_ACTION_MISMATCH = "selected-job-action-mismatch"

DOWNLOAD_COMPLETED = "download-completed"
DOWNLOAD_INTERRUPTED = "download-interrupted"
Expand Down
3 changes: 3 additions & 0 deletions src/upparat/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@

from upparat.config import settings

UPPARAT_ACTION = "upparat-update"

# AWS job execution
EXECUTION = "execution"
EXECUTION_STATE = "executionState"
Expand All @@ -14,6 +16,7 @@
# AWS job document
JOB_ID = "jobId"
JOB_DOCUMENT = "jobDocument"
JOB_DOCUMENT_ACTION = "action"
JOB_DOCUMENT_FILE = "file"
JOB_DOCUMENT_VERSION = "version"
JOB_DOCUMENT_META = "meta"
Expand Down
5 changes: 5 additions & 0 deletions src/upparat/statemachine/machine.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from upparat.events import JOBS_AVAILABLE
from upparat.events import NO_JOBS_PENDING
from upparat.events import RESTART_INTERRUPTED
from upparat.events import SELECT_JOB_ACTION_MISMATCH
from upparat.events import SELECT_JOB_INTERRUPTED
from upparat.statemachine import UpparatStateMachine
from upparat.statemachine.download import DownloadState
Expand Down Expand Up @@ -71,6 +72,10 @@ def create_statemachine(event_queue, mqtt_client):
select_job_state, fetch_jobs_state, events=[SELECT_JOB_INTERRUPTED]
)

statemachine.add_transition(
select_job_state, monitor_state, events=[SELECT_JOB_ACTION_MISMATCH]
)

# Job is ready for process
statemachine.add_transition(verify_job_state, download_state, events=[JOB_VERIFIED])

Expand Down
12 changes: 12 additions & 0 deletions src/upparat/statemachine/select_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,15 @@
from upparat.events import MQTT_EVENT_TOPIC
from upparat.events import MQTT_MESSAGE_RECEIVED
from upparat.events import MQTT_SUBSCRIBED
from upparat.events import SELECT_JOB_ACTION_MISMATCH
from upparat.events import SELECT_JOB_INTERRUPTED
from upparat.jobs import describe_job_execution
from upparat.jobs import describe_job_execution_response
from upparat.jobs import EXECUTION
from upparat.jobs import Job
from upparat.jobs import JOB_ACCEPTED
from upparat.jobs import JOB_DOCUMENT
from upparat.jobs import JOB_DOCUMENT_ACTION
from upparat.jobs import JOB_DOCUMENT_FILE
from upparat.jobs import JOB_DOCUMENT_FORCE
from upparat.jobs import JOB_DOCUMENT_META
Expand All @@ -32,6 +34,7 @@
from upparat.jobs import JOB_STATUS_DETAILS
from upparat.jobs import job_update_multiple_as_failed
from upparat.jobs import JobProgressStatus
from upparat.jobs import UPPARAT_ACTION
from upparat.statemachine import BaseState

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -128,6 +131,15 @@ def on_message(self, state, event):
if topic_matches_sub(accepted_topic, topic):
job_execution = payload[EXECUTION]
job_document = job_execution[JOB_DOCUMENT]
job_document_action = job_document.get(JOB_DOCUMENT_ACTION)

# something else can also publish jobs → make sure to only handle the ones for us
if job_document_action != UPPARAT_ACTION:
logger.info(
f"Job ignored: Job document does not match expected Upparat action field {UPPARAT_ACTION}." # noqa
)
self.publish(Event(SELECT_JOB_ACTION_MISMATCH))
return

job = Job(
id_=job_execution[JOB_ID],
Expand Down
29 changes: 29 additions & 0 deletions tests/statemachine/select_job_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from upparat.events import JOB_SELECTED
from upparat.events import MQTT_MESSAGE_RECEIVED
from upparat.events import MQTT_SUBSCRIBED
from upparat.events import SELECT_JOB_ACTION_MISMATCH
from upparat.events import SELECT_JOB_INTERRUPTED
from upparat.jobs import describe_job_execution_response
from upparat.jobs import Job
Expand Down Expand Up @@ -175,6 +176,33 @@ def test_on_message_rejected_job(select_job_state, create_mqtt_message_event):
assert inbox.empty()


def test_on_message_not_from_upparat_job(select_job_state, create_mqtt_message_event):
state, inbox, _, __ = select_job_state

state.current_job_id = "42"
settings.broker.thing_name = "bobby"

event = create_mqtt_message_event(
describe_job_execution_response(
settings.broker.thing_name, state.current_job_id, state_filter=JOB_ACCEPTED
),
{
"execution": {
"jobId": "42",
"status": JobStatus.IN_PROGRESS.value,
"statusDetails": "...",
"jobDocument": {"jobFromAnotherService": "notFromUpparat"},
}
},
)

state.on_message(None, event)

published_event = inbox.get_nowait()
assert published_event.name == SELECT_JOB_ACTION_MISMATCH
assert inbox.empty()


def test_on_message_accepted_job(select_job_state, create_mqtt_message_event):
state, inbox, _, __ = select_job_state

Expand All @@ -197,6 +225,7 @@ def test_on_message_accepted_job(select_job_state, create_mqtt_message_event):
"status": status,
"statusDetails": status_details,
"jobDocument": {
"action": "upparat-update",
"version": version,
"file": file_url,
"force": force,
Expand Down
8 changes: 8 additions & 0 deletions tests/statemachine/statemachine_transitions_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
from upparat.events import JOBS_AVAILABLE
from upparat.events import NO_JOBS_PENDING
from upparat.events import RESTART_INTERRUPTED
from upparat.events import SELECT_JOB_ACTION_MISMATCH
from upparat.events import SELECT_JOB_INTERRUPTED
from upparat.statemachine.download import DownloadState
from upparat.statemachine.fetch_jobs import FetchJobsState
Expand Down Expand Up @@ -125,6 +126,13 @@ def test_fetch_jobs_pending_jobs_found(fetch_jobs_state):
return statemachine, statemachine.state


def test_select_found_job_not_for_upparat(select_job_state):
statemachine, _ = select_job_state
statemachine.dispatch(Event(SELECT_JOB_ACTION_MISMATCH))
assert isinstance(statemachine.state, MonitorState)
return statemachine, statemachine.state


def test_select_found_job_to_processs(select_job_state):
statemachine, _ = select_job_state
statemachine.dispatch(Event(JOB_SELECTED))
Expand Down

0 comments on commit bac32f1

Please sign in to comment.