Skip to content

Commit

Permalink
feat: only work with prefixed jobs (#23)
Browse files Browse the repository at this point in the history
only work with prefixed jobs

note: this is most likely a temporary "workaround",
the proper solution would probably be to use namespaces.

Right now, AWS IoT Job Namespaces are in public preview.

This fixes the issue that Upparat has a monopoly on AWS IoT Job API,
i.e. allows adding other services to also use AWS IoT Jobs on the same
thing.
  • Loading branch information
livioso authored Dec 3, 2020
1 parent 83d00bd commit 38bc1eb
Show file tree
Hide file tree
Showing 16 changed files with 132 additions and 94 deletions.
1 change: 0 additions & 1 deletion src/upparat/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
JOB_REVOKED = "job-revoked"

SELECT_JOB_INTERRUPTED = "selected-job-interrupted"
SELECT_JOB_ACTION_MISMATCH = "selected-job-action-mismatch"

DOWNLOAD_COMPLETED = "download-completed"
DOWNLOAD_INTERRUPTED = "download-interrupted"
Expand Down
17 changes: 15 additions & 2 deletions src/upparat/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,11 @@

from upparat.config import settings

UPPARAT_ACTION = "upparat-update"
# FIXME: Prefix jobs since there could be
# multiple services consuming AWS IoT Jobs.
# Revisit this once AWS roles out AWS IoT Job
# Namespaces which would be the "correct" solution.
UPPARAT_JOB_PREFIX = "upparat_"

# AWS job execution
EXECUTION = "execution"
Expand All @@ -16,7 +20,6 @@
# AWS job document
JOB_ID = "jobId"
JOB_DOCUMENT = "jobDocument"
JOB_DOCUMENT_ACTION = "action"
JOB_DOCUMENT_FILE = "file"
JOB_DOCUMENT_VERSION = "version"
JOB_DOCUMENT_META = "meta"
Expand Down Expand Up @@ -80,6 +83,10 @@ class JobProgressStatus(Enum):
ERROR_MULTIPLE_IN_PROGRESS = "error_multiple_in_progress"


def is_upparat_job_id(job_id):
return job_id.startswith(UPPARAT_JOB_PREFIX)


def jobs_base(thing_name):
return f"$aws/things/{thing_name}/jobs/"

Expand Down Expand Up @@ -131,6 +138,12 @@ def job_update(mqtt_client, thing_name, job_id, status, state, message=None):
)


def filter_upparat_job_exectutions(job_executions):
return [
job for job in job_executions if job["jobId"].startswith(UPPARAT_JOB_PREFIX)
]


def job_update_multiple_as_failed(
mqtt_client, thing_name, job_ids, state, message=None
):
Expand Down
11 changes: 9 additions & 2 deletions src/upparat/statemachine/fetch_jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
from upparat.events import MQTT_MESSAGE_RECEIVED
from upparat.events import MQTT_SUBSCRIBED
from upparat.events import NO_JOBS_PENDING
from upparat.jobs import filter_upparat_job_exectutions
from upparat.jobs import get_pending_job_executions
from upparat.jobs import get_pending_job_executions_response
from upparat.statemachine import BaseState
Expand Down Expand Up @@ -54,8 +55,14 @@ def on_message(self, state, event):

# Handle accepted pending jobs executions
if topic_matches_sub(self.get_pending_job_executions_response, topic):
in_progress_job_executions = payload.get(IN_PROGRESS_JOBS, [])
queued_job_executions = payload.get(QUEUED_JOBS, [])
in_progress_job_executions = filter_upparat_job_exectutions(
payload.get(IN_PROGRESS_JOBS, [])
)

queued_job_executions = filter_upparat_job_exectutions(
payload.get(QUEUED_JOBS, [])
)

# If there are jobs available go to prepare state
if in_progress_job_executions or queued_job_executions:
logger.debug("Job executions available.")
Expand Down
5 changes: 0 additions & 5 deletions src/upparat/statemachine/machine.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
from upparat.events import JOBS_AVAILABLE
from upparat.events import NO_JOBS_PENDING
from upparat.events import RESTART_INTERRUPTED
from upparat.events import SELECT_JOB_ACTION_MISMATCH
from upparat.events import SELECT_JOB_INTERRUPTED
from upparat.statemachine import UpparatStateMachine
from upparat.statemachine.download import DownloadState
Expand Down Expand Up @@ -72,10 +71,6 @@ def create_statemachine(event_queue, mqtt_client):
select_job_state, fetch_jobs_state, events=[SELECT_JOB_INTERRUPTED]
)

statemachine.add_transition(
select_job_state, monitor_state, events=[SELECT_JOB_ACTION_MISMATCH]
)

# Job is ready for process
statemachine.add_transition(verify_job_state, download_state, events=[JOB_VERIFIED])

Expand Down
11 changes: 9 additions & 2 deletions src/upparat/statemachine/monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from upparat.events import MQTT_EVENT_PAYLOAD
from upparat.events import MQTT_EVENT_TOPIC
from upparat.events import MQTT_MESSAGE_RECEIVED
from upparat.jobs import filter_upparat_job_exectutions
from upparat.jobs import pending_jobs_response
from upparat.statemachine import BaseState

Expand Down Expand Up @@ -41,8 +42,14 @@ def on_message(self, state, event):

if topic_matches_sub(self.job_pending_response, topic):
payload = json.loads(event.cargo[MQTT_EVENT_PAYLOAD])
in_progress_job_executions = payload["jobs"].get(JOBS_IN_PROGRESS, [])
queued_job_executions = payload["jobs"].get(JOBS_QUEUED, [])

in_progress_job_executions = filter_upparat_job_exectutions(
payload["jobs"].get(JOBS_IN_PROGRESS, [])
)

queued_job_executions = filter_upparat_job_exectutions(
payload["jobs"].get(JOBS_QUEUED, [])
)

# If there are jobs available go to job selection state
if in_progress_job_executions or queued_job_executions:
Expand Down
12 changes: 0 additions & 12 deletions src/upparat/statemachine/select_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,13 @@
from upparat.events import MQTT_EVENT_TOPIC
from upparat.events import MQTT_MESSAGE_RECEIVED
from upparat.events import MQTT_SUBSCRIBED
from upparat.events import SELECT_JOB_ACTION_MISMATCH
from upparat.events import SELECT_JOB_INTERRUPTED
from upparat.jobs import describe_job_execution
from upparat.jobs import describe_job_execution_response
from upparat.jobs import EXECUTION
from upparat.jobs import Job
from upparat.jobs import JOB_ACCEPTED
from upparat.jobs import JOB_DOCUMENT
from upparat.jobs import JOB_DOCUMENT_ACTION
from upparat.jobs import JOB_DOCUMENT_FILE
from upparat.jobs import JOB_DOCUMENT_FORCE
from upparat.jobs import JOB_DOCUMENT_META
Expand All @@ -34,7 +32,6 @@
from upparat.jobs import JOB_STATUS_DETAILS
from upparat.jobs import job_update_multiple_as_failed
from upparat.jobs import JobProgressStatus
from upparat.jobs import UPPARAT_ACTION
from upparat.statemachine import BaseState

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -131,15 +128,6 @@ def on_message(self, state, event):
if topic_matches_sub(accepted_topic, topic):
job_execution = payload[EXECUTION]
job_document = job_execution[JOB_DOCUMENT]
job_document_action = job_document.get(JOB_DOCUMENT_ACTION)

# something else can also publish jobs → make sure to only handle the ones for us
if job_document_action != UPPARAT_ACTION:
logger.info(
f"Job ignored: Job document does not match expected Upparat action field {UPPARAT_ACTION}." # noqa
)
self.publish(Event(SELECT_JOB_ACTION_MISMATCH))
return

job = Job(
id_=job_execution[JOB_ID],
Expand Down
4 changes: 3 additions & 1 deletion tests/statemachine/download_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
import pytest

from ..utils import create_hook_event # noqa: F401
from ..utils import create_mqtt_message_event # noqa: F401
from ..utils import generate_random_job_id
from upparat.config import settings
from upparat.events import DOWNLOAD_COMPLETED
from upparat.events import DOWNLOAD_INTERRUPTED
Expand Down Expand Up @@ -55,7 +57,7 @@ def download_state(mocker, tmpdir):
state = DownloadState()

state.job = Job(
id_="424242",
id_=generate_random_job_id(),
status=JobStatus.IN_PROGRESS,
file_url="https://foo.bar/baz",
version="1.1.1",
Expand Down
33 changes: 28 additions & 5 deletions tests/statemachine/fetch_jobs_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

from ..utils import create_mqtt_message_event # noqa: F401
from ..utils import create_mqtt_subscription_event # noqa: F401
from ..utils import generate_random_job_id
from upparat.config import settings
from upparat.events import JOBS_AVAILABLE
from upparat.events import MQTT_MESSAGE_RECEIVED
Expand All @@ -13,6 +14,16 @@
from upparat.statemachine import UpparatStateMachine
from upparat.statemachine.fetch_jobs import FetchJobsState

NON_UPPARAT_IN_PROGRESS_JOBS = [
{"jobId": "non_upparat_job_in_progress_1"},
{"jobId": "non_upparat_job_in_progress_2"},
]

NON_UPPARAT_QUEUED_JOBS = [
{"jobId": "non_upparat_job_queued_3"},
{"jobId": "non_upparat_job_queued_4"},
]


@pytest.fixture
def fetch_jobs_state(mocker):
Expand Down Expand Up @@ -63,7 +74,11 @@ def test_on_message_no_pending_jobs(fetch_jobs_state, create_mqtt_message_event)
state.on_enter(None, None)

topic = f"$aws/things/{settings.broker.thing_name}/jobs/get/+"
payload = {"queuedJobs": [], "inProgressJobs": []}

payload = {
"queuedJobs": NON_UPPARAT_QUEUED_JOBS,
"inProgressJobs": NON_UPPARAT_IN_PROGRESS_JOBS,
}

mqtt_message_event = create_mqtt_message_event(topic, payload)
state.on_message(None, mqtt_message_event)
Expand All @@ -79,9 +94,13 @@ def test_on_message_pending_queued_jobs(fetch_jobs_state, create_mqtt_message_ev
settings.broker.thing_name = "bobby"
state.on_enter(None, None)

queued_job = {"jobId": "42"}
queued_job = {"jobId": generate_random_job_id()}
topic = f"$aws/things/{settings.broker.thing_name}/jobs/get/+"
payload = {"queuedJobs": [queued_job], "inProgressJobs": []}

payload = {
"queuedJobs": NON_UPPARAT_QUEUED_JOBS + [queued_job],
"inProgressJobs": NON_UPPARAT_IN_PROGRESS_JOBS,
}

mqtt_message_event = create_mqtt_message_event(topic, payload)
state.on_message(None, mqtt_message_event)
Expand All @@ -104,9 +123,13 @@ def test_on_message_pending_progress_jobs(fetch_jobs_state, create_mqtt_message_
settings.broker.thing_name = "bobby"
state.on_enter(None, None)

progress_job = {"jobId": "42"}
progress_job = {"jobId": generate_random_job_id()}
topic = f"$aws/things/{settings.broker.thing_name}/jobs/get/+"
payload = {"queuedJobs": [], "inProgressJobs": [progress_job]}

payload = {
"queuedJobs": NON_UPPARAT_QUEUED_JOBS,
"inProgressJobs": NON_UPPARAT_IN_PROGRESS_JOBS + [progress_job],
}

mqtt_message_event = create_mqtt_message_event(topic, payload)
state.on_message(None, mqtt_message_event)
Expand Down
3 changes: 2 additions & 1 deletion tests/statemachine/install_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import pytest

from ..utils import create_hook_event # noqa: F401
from ..utils import generate_random_job_id
from upparat.config import settings
from upparat.events import HOOK
from upparat.events import HOOK_STATUS_COMPLETED
Expand All @@ -20,7 +21,7 @@
from upparat.statemachine.install import InstallState

JOB_ = Job(
"42",
generate_random_job_id(),
JobStatus.IN_PROGRESS,
"http://foo.bar/baz.bin",
"1.0.0",
Expand Down
31 changes: 27 additions & 4 deletions tests/statemachine/monitor_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,23 @@

from ..utils import create_mqtt_message_event # noqa: F401
from ..utils import create_mqtt_subscription_event # noqa: F401
from ..utils import generate_random_job_id
from upparat.config import settings
from upparat.events import JOBS_AVAILABLE
from upparat.events import MQTT_MESSAGE_RECEIVED
from upparat.statemachine import UpparatStateMachine
from upparat.statemachine.monitor import MonitorState

NON_UPPARAT_IN_PROGRESS_JOBS = [
{"jobId": "non_upparat_job_in_progress_1"},
{"jobId": "non_upparat_job_in_progress_2"},
]

NON_UPPARAT_QUEUED_JOBS = [
{"jobId": "non_upparat_job_queued_3"},
{"jobId": "non_upparat_job_queued_4"},
]


@pytest.fixture
def monitor_state(mocker):
Expand Down Expand Up @@ -72,9 +83,15 @@ def test_on_message_pending_queued_jobs(monitor_state, create_mqtt_message_event
settings.broker.thing_name = "bobby"
state.on_enter(None, None)

queued_job = {"jobId": "42"}
queued_job = {"jobId": generate_random_job_id()}
topic = f"$aws/things/{settings.broker.thing_name}/jobs/notify"
payload = {"jobs": {"IN_PROGRESS": [], "QUEUED": [queued_job]}}

payload = {
"jobs": {
"IN_PROGRESS": NON_UPPARAT_IN_PROGRESS_JOBS,
"QUEUED": NON_UPPARAT_QUEUED_JOBS + [queued_job],
}
}

mqtt_message_event = create_mqtt_message_event(topic, payload)
state.on_message(None, mqtt_message_event)
Expand All @@ -97,9 +114,15 @@ def test_on_message_pending_progress_jobs(monitor_state, create_mqtt_message_eve
settings.broker.thing_name = "bobby"
state.on_enter(None, None)

progress_job = {"jobId": "42"}
progress_job = {"jobId": generate_random_job_id()}
topic = f"$aws/things/{settings.broker.thing_name}/jobs/notify"
payload = {"jobs": {"IN_PROGRESS": [progress_job], "QUEUED": []}}

payload = {
"jobs": {
"IN_PROGRESS": NON_UPPARAT_IN_PROGRESS_JOBS + [progress_job],
"QUEUED": NON_UPPARAT_QUEUED_JOBS,
}
}

mqtt_message_event = create_mqtt_message_event(topic, payload)
state.on_message(None, mqtt_message_event)
Expand Down
3 changes: 2 additions & 1 deletion tests/statemachine/restart_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import pytest

from ..utils import create_hook_event # noqa: F401
from ..utils import generate_random_job_id
from upparat.config import settings
from upparat.events import HOOK
from upparat.events import HOOK_STATUS_COMPLETED
Expand All @@ -19,7 +20,7 @@
from upparat.statemachine.restart import RestartState

JOB_ = Job(
"42",
generate_random_job_id(),
JobStatus.IN_PROGRESS,
"http://foo.bar/baz.bin",
"1.0.0",
Expand Down
Loading

0 comments on commit 38bc1eb

Please sign in to comment.