Skip to content

Commit

Permalink
fix(host_core): Makes chunking work in all places (#380)
Browse files Browse the repository at this point in the history
This now makes it so that actor to actor call chunking works as well
as provider to actor call chunking.

Note: I will come back and add a test for actor to actor chunking once
I merge in the new changes to the petclinic

Signed-off-by: Taylor Thomas <[email protected]>
  • Loading branch information
thomastaylor312 authored Apr 14, 2022
1 parent 46011fd commit 7779985
Show file tree
Hide file tree
Showing 5 changed files with 33 additions and 12 deletions.
12 changes: 10 additions & 2 deletions host_core/lib/host_core/actors/actor_module.ex
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,7 @@ defmodule HostCore.Actors.ActorModule do
)
when byte_size(response) > 700 * 1024 do
with :ok <- HostCore.WasmCloud.Native.chunk_inv("#{invid}-r", response) do
%{map | msg: nil}
%{map | msg: <<>>}
else
_ ->
map
Expand All @@ -293,7 +293,15 @@ defmodule HostCore.Actors.ActorModule do
invocation_id: inv_id
)

HostCore.WasmCloud.Native.dechunk_inv(inv_id)
case HostCore.WasmCloud.Native.dechunk_inv(inv_id) do
{:ok, bytes} ->
bytes

{:error, e} ->
Logger.error("Failed to dechunk invocation response: #{inspect(e)}")

<<>>
end
else
bytes
end
Expand Down
17 changes: 12 additions & 5 deletions host_core/lib/host_core/web_assembly/imports.ex
Original file line number Diff line number Diff line change
Expand Up @@ -424,15 +424,18 @@ defmodule HostCore.WebAssembly.Imports do
ir = res |> Msgpax.unpack!()

if ir["error"] == nil do
{1, :host_response, check_dechunk(ir)}
{1, :host_response, check_dechunk(ir) |> IO.iodata_to_binary()}
else
{0, :host_error, ir["error"]}
end
end
end

defp safe_bsize(nil), do: 0
defp safe_bsize(b) when is_binary(b), do: byte_size(b)

defp check_dechunk(ir) do
bsize = byte_size(Map.get(ir, "msg", <<>>))
bsize = safe_bsize(Map.get(ir, "msg", <<>>))
invid = "#{ir["invocation_id"]}-r"

# if declared content size is greater than the actual (e.g. empty payload) then
Expand All @@ -441,8 +444,12 @@ defmodule HostCore.WebAssembly.Imports do
{:ok, bytes} <- HostCore.WasmCloud.Native.dechunk_inv(invid) do
bytes
else
{:error, e} ->
Logger.error("Failed to dechunk invocation response: #{inspect(e)}")
<<>>

_ ->
ir["msg"]
Map.get(ir, "msg", <<>>)
end
end

Expand All @@ -455,7 +462,7 @@ defmodule HostCore.WebAssembly.Imports do

defp host_response_len(_api_type, _context, agent) do
if (hr = Agent.get(agent, fn content -> content.host_response end)) != nil do
byte_size(hr)
safe_bsize(hr)
else
0
end
Expand All @@ -470,7 +477,7 @@ defmodule HostCore.WebAssembly.Imports do

defp host_error_len(_api_type, _context, agent) do
if (he = Agent.get(agent, fn content -> content.host_error end)) != nil do
byte_size(he)
safe_bsize(he)
else
0
end
Expand Down
3 changes: 1 addition & 2 deletions host_core/native/hostcore_wasmcloud_native/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ data-encoding = "2.3.2"
rmp = "0.8.10"
rmp-serde = "1.0.0"
oci-distribution = "0.8.1"
# provider-archive = "0.4.0"
provider-archive = "0.6.0"
tokio = {version = "1.7.1", features = ["rt", "rt-multi-thread"] }
once_cell = "1.2.0"
Expand All @@ -32,4 +31,4 @@ tokio-stream = "0.1"
bindle = { version = "0.8", default-features = false, features = ["client", "caching"] }
async-trait = "0.1"
reqwest = "0.11"
nats = "0.18.1"
nats = "0.18.1"
4 changes: 2 additions & 2 deletions host_core/native/hostcore_wasmcloud_native/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -480,8 +480,8 @@ fn generate_key(key_type: KeyType) -> Result<(String, String), Error> {
}

#[rustler::nif(schedule = "DirtyIo")]
fn dechunk_inv(inv_id: String) -> Result<Vec<u8>, Error> {
objstore::unchonk_from_object_store(&inv_id)
fn dechunk_inv(inv_id: String) -> Result<(Atom, Vec<u8>), Error> {
Ok((atoms::ok(), objstore::unchonk_from_object_store(&inv_id)?))
}

#[rustler::nif(schedule = "DirtyIo")]
Expand Down
9 changes: 8 additions & 1 deletion host_core/test/host_core/actors_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,14 @@ defmodule HostCore.ActorsTest do

assert res != :fail
ir = res |> Msgpax.unpack!()
ir = Map.put(ir, "msg", HostCore.WasmCloud.Native.dechunk_inv("#{ir["invocation_id"]}-r"))

ir =
case HostCore.WasmCloud.Native.dechunk_inv("#{ir["invocation_id"]}-r") do
{:ok, resp} -> Map.put(ir, "msg", resp)
{:error, _e} -> :fail
end

assert ir != :fail

# NOTE: this is using "magic knowledge" that the HTTP server provider is using
# msgpack to communicate with actors
Expand Down

0 comments on commit 7779985

Please sign in to comment.