Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add type information for inbound messages on the DeviceChannel #1444

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion lib/nerves_hub/deployments.ex
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ defmodule NervesHub.Deployments do
archive_id: deployment.archive_id
}

broadcast(deployment, "archives/updated", payload)
_ = broadcast(deployment, "archives/updated", payload)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm assuming this is a dialyzer fix? We should prob include this in a different PR so its away from these changes

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, dialyzer


description = "deployment #{deployment.name} has a new archive"
AuditLogs.audit!(deployment, deployment, description)
Expand Down
5 changes: 4 additions & 1 deletion lib/nerves_hub/devices/device.ex
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ defmodule NervesHub.Devices.Device do
:connection_types,
:connection_metadata
]
@connection_types [:cellular, :ethernet, :wifi]
@required_params [:org_id, :product_id, :identifier]

schema "devices" do
Expand All @@ -53,7 +54,7 @@ defmodule NervesHub.Devices.Device do
field(:connection_established_at, :utc_datetime)
field(:connection_disconnected_at, :utc_datetime)
field(:connection_last_seen_at, :utc_datetime)
field(:connection_types, {:array, Ecto.Enum}, values: [:cellular, :ethernet, :wifi])
field(:connection_types, {:array, Ecto.Enum}, values: @connection_types)
field(:connecting_code, :string)
field(:connection_metadata, :map, default: %{})

Expand All @@ -68,4 +69,6 @@ defmodule NervesHub.Devices.Device do
|> validate_length(:tags, min: 1)
|> unique_constraint(:identifier)
end

def connection_types, do: @connection_types
end
30 changes: 18 additions & 12 deletions lib/nerves_hub_web/channels/device_channel.ex
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ defmodule NervesHubWeb.DeviceChannel do
alias NervesHub.Repo
alias NervesHub.Tracker
alias Phoenix.Socket.Broadcast
alias NervesHubWeb.DeviceChannel.Messages

def join("device", params, %{assigns: %{device: device}} = socket) do
with {:ok, device} <- update_metadata(device, params),
Expand Down Expand Up @@ -413,7 +414,12 @@ defmodule NervesHubWeb.DeviceChannel do
{:noreply, socket}
end

def handle_in("fwup_progress", %{"value" => percent}, %{assigns: %{device: device}} = socket) do
def handle_in(event, params, socket) when is_binary(event) do
{event_atom, parsed_params} = Messages.parse(event, params)
handle_in(event_atom, parsed_params, socket)
end

def handle_in(:fwup_progress, %{percent: percent}, %{assigns: %{device: device}} = socket) do
NervesHubWeb.DeviceEndpoint.broadcast_from!(
self(),
"device:#{device.identifier}:internal",
Expand Down Expand Up @@ -448,7 +454,7 @@ defmodule NervesHubWeb.DeviceChannel do
end
end

def handle_in("location:update", location, %{assigns: %{device: device}} = socket) do
def handle_in(:location_update, location, %{assigns: %{device: device}} = socket) do
metadata = Map.put(device.connection_metadata, "location", location)

{:ok, device} = Devices.update_device(device, %{connection_metadata: metadata})
Expand All @@ -463,17 +469,17 @@ defmodule NervesHubWeb.DeviceChannel do
{:reply, :ok, assign(socket, :device, device)}
end

def handle_in("connection_types", %{"values" => types}, %{assigns: %{device: device}} = socket) do
def handle_in(:connection_types, %{types: types}, %{assigns: %{device: device}} = socket) do
{:ok, device} = Devices.update_device(device, %{"connection_types" => types})
{:noreply, assign(socket, :device, device)}
end

def handle_in("status_update", %{"status" => _status}, socket) do
def handle_in(:status_update, %{}, socket) do
# TODO store in tracker or the database?
{:noreply, socket}
end

def handle_in("check_update_available", _params, socket) do
def handle_in(:check_update_available, _params, socket) do
device =
socket.assigns.device
|> Devices.verify_deployment()
Expand All @@ -487,29 +493,29 @@ defmodule NervesHubWeb.DeviceChannel do
{:reply, {:ok, update_payload}, socket}
end

def handle_in("rebooting", _, socket) do
def handle_in(:rebooting, %{}, socket) do
{:noreply, socket}
end

def handle_in("scripts/run", params, socket) do
if pid = socket.assigns.script_refs[params["ref"]] do
output = Enum.join([params["output"], params["return"]], "\n")
def handle_in(:scripts_run, %{ref: ref, output: output, return: return}, socket) do
if pid = socket.assigns.script_refs[ref] do
output = Enum.join([output, return], "\n")
output = String.trim(output)
send(pid, {:output, output})
end

{:noreply, socket}
end

def handle_in("health_check_report", %{"value" => device_status}, socket) do
def handle_in(:health_check_report, device_status, socket) do
device_meta =
for {key, val} <- Map.from_struct(socket.assigns.device.firmware_metadata),
into: %{},
do: {to_string(key), to_string(val)}
do: {key, to_string(val)}

full_report =
device_status
|> Map.put("metadata", Map.merge(device_status["metadata"], device_meta))
|> Map.put(:metadata, Map.merge(device_status.metadata, device_meta))

device_health = %{"device_id" => socket.assigns.device.id, "data" => full_report}

Expand Down
135 changes: 135 additions & 0 deletions lib/nerves_hub_web/channels/device_channel/messages.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
defmodule NervesHubWeb.DeviceChannel.Messages do
@moduledoc false
alias NervesHub.Devices.Device

require Logger
@type alarm_id() :: String.t()
@type alarm_description() :: String.t()

@type health_check_report() :: %{
timestamp: DateTime.t(),
metadata: %{String.t() => String.t()},
alarms: %{alarm_id() => alarm_description()},
metrics: %{String.t() => number()},
checks: %{String.t() => %{pass: boolean(), note: String.t()}}
}

@type scripts_run() :: %{
ref: String.t(),
output: String.t(),
return: String.t()
}

@type fwup_progress() :: %{
percent: integer()
}

@type location() :: term()

@type connection_types() :: %{types: list(atom())}

@type status_update() :: map()

@type check_update_available() :: map()

# We parse out messages explicitly to let the compiler help with types and
# to keep track of what we have coming in and out of the system
# They are not structs to reduce the proliferation of modules for what is mostly
# an inbetween layer
# If the role of these definitions grows to much it may make sense to turn them into
# structs.
@spec parse(event :: String.t(), params :: map()) ::
{:fwup_progress, fwup_progress()}
| {:location_update, location()}
| {:connection_types, connection_types()}
| {:status_update, status_update()}
| {:check_update_available, check_update_available()}
| {:health_check_report, health_check_report()}
| {:scripts_run, scripts_run()}
| {:rebooting, map()}
| {:unknown, map()}
def parse(event, params)

def parse("fwup_progress", %{"value" => percent}) do
{:fwup_progress, %{percent: percent}}
end

def parse("location:update", location) do
{:location_update, location}
end

@valid_types Device.connection_types()
def parse("connection_types", %{"values" => types}) do
types =
types
|> Enum.map(fn type ->
try do
String.to_existing_atom(type)
rescue
_ -> nil
end
end)
|> Enum.filter(fn type ->
if type in @valid_types do
true
else
Logger.warning("Received invalid type for connection_types: #{inspect(type)}")
false
end
end)
Comment on lines +64 to +79
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
types
|> Enum.map(fn type ->
try do
String.to_existing_atom(type)
rescue
_ -> nil
end
end)
|> Enum.filter(fn type ->
if type in @valid_types do
true
else
Logger.warning("Received invalid type for connection_types: #{inspect(type)}")
false
end
end)
for {type, type_str} <- Ecto.Enum.mappings(Device, :connection_types), type_str in types, do: type

We can use the Ecto.Enum mappings to get the valid types out of this. I'm unsure we need to warn on invalid being included? (this would mostly be hidden to the users anyway)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just figured we'd do something when we see weird things :)

Would Ecto.Enum be smoother somehow or are you thinking full-on embedded schema?


{:connection_types, %{types: types}}
end

def parse("status_update", %{"status" => _status}) do
{:status_update, %{}}
end

def parse("check_update_available", _params) do
{:check_update_available, %{}}
end

def parse("health_check_report", %{
"value" => %{
"timestamp" => iso_ts,
"metadata" => metadata,
"alarms" => alarms,
"metrics" => metrics,
"checks" => checks
}
}) do
{:ok, ts, _} = DateTime.from_iso8601(iso_ts)

status = %{
timestamp: ts,
metadata: metadata,
alarms: alarms,
metrics: metrics,
checks: to_checks(checks)
}

{:health_check_report, status}
end

def parse("scripts/run", %{"ref" => ref, "output" => output, "return" => return}) do
{:scripts_run, %{ref: ref, output: output, return: return}}
end

def parse("rebooting", _) do
{:rebooting, %{}}
end

def parse(event, params) do
Logger.warning(
"Unmatched incoming event in device channel messages '#{event}' with #{inspect(params)}"
)

{:unknown, params}
end

defp to_checks(checks) do
for {key, %{"pass" => pass, "note" => note}} <- checks, into: %{} do
{key, %{pass: pass, note: note}}
end
end
end
Loading