diff --git a/lib/nodejs/worker.ex b/lib/nodejs/worker.ex index 37fb962..7daf05d 100644 --- a/lib/nodejs/worker.ex +++ b/lib/nodejs/worker.ex @@ -1,6 +1,9 @@ defmodule NodeJS.Worker do use GenServer + # Port can't do more than this. + @read_chunk_size 65_536 + @moduledoc """ A genserver that controls the starting of the node service """ @@ -21,15 +24,42 @@ defmodule NodeJS.Worker do @doc false def init(module_path) do node = System.find_executable("node") - port = Port.open({:spawn_executable, node}, env: [{'NODE_PATH', String.to_charlist(module_path)}], args: [node_service_path()]) + + port = + Port.open( + {:spawn_executable, node}, + line: @read_chunk_size, + env: [ + {'NODE_PATH', String.to_charlist(module_path)}, + {'WRITE_CHUNK_SIZE', String.to_charlist("#{@read_chunk_size}")} + ], + args: [node_service_path()] + ) + {:ok, [node_service_path(), port]} end + defp get_response(data \\ '') do + receive do + {_, {:data, {flag, chunk}}} -> + data = data ++ chunk + + case flag do + :noeol -> get_response(data) + :eol -> data + end + end + end + @doc false def handle_call({module, args}, _from, [_, port] = state) when is_tuple(module) do body = Jason.encode!([Tuple.to_list(module), args]) Port.command(port, "#{body}\n") - response = receive do {_, {:data, data}} -> decode(data) end + + response = + get_response() + |> decode() + {:reply, response, state} end diff --git a/mix.exs b/mix.exs index 71379fd..5969aa9 100644 --- a/mix.exs +++ b/mix.exs @@ -4,7 +4,7 @@ defmodule NodeJS.MixProject do def project do [ app: :nodejs, - version: "0.2.3", + version: "1.0.0", elixir: "~> 1.6", start_permanent: Mix.env() == :prod, deps: deps(), diff --git a/priv/server.js b/priv/server.js index 811110b..edb382c 100644 --- a/priv/server.js +++ b/priv/server.js @@ -1,6 +1,7 @@ const path = require('path') const readline = require('readline') const { MODULE_SEARCH_PATH } = process.env +const WRITE_CHUNK_SIZE = parseInt(process.env.WRITE_CHUNK_SIZE, 10) function rewritePath(oldPath) { return oldPath @@ -61,9 +62,13 @@ async function getResponse(string) { } async function onLine(string) { - const response = await getResponse(string) + const buffer = Buffer.from(`${await getResponse(string)}\n`) - process.stdout.write(response) + for (let i = 0; i < buffer.length; i += WRITE_CHUNK_SIZE) { + let chunk = buffer.slice(i, i + WRITE_CHUNK_SIZE) + + process.stdout.write(chunk) + } } function startServer() { diff --git a/test/js/keyed-functions.js b/test/js/keyed-functions.js index b1fe992..5b188d7 100644 --- a/test/js/keyed-functions.js +++ b/test/js/keyed-functions.js @@ -16,6 +16,10 @@ function throwTypeError() { throw new TypeError('oops') } +function getBytes(size) { + return Buffer.alloc(size) +} + class Unserializable { constructor() { this.circularRef = this @@ -34,4 +38,13 @@ function getEnv() { return process.env } -module.exports = {uuid, hello, math: {add, sub}, throwTypeError, getIncompatibleReturnValue, getArgv, getEnv} +module.exports = { + uuid, + hello, + math: { add, sub }, + throwTypeError, + getBytes, + getIncompatibleReturnValue, + getArgv, + getEnv, +} diff --git a/test/nodejs_test.exs b/test/nodejs_test.exs index d1db11e..56e9963 100644 --- a/test/nodejs_test.exs +++ b/test/nodejs_test.exs @@ -1,5 +1,5 @@ defmodule NodeJS.Test do - use ExUnit.Case + use ExUnit.Case, async: true doctest NodeJS setup_all do @@ -22,6 +22,12 @@ defmodule NodeJS.Test do |> String.trim() end + describe "large payload" do + test "does not explode" do + NodeJS.call!({"keyed-functions", "getBytes"}, [128_000]) + end + end + describe "calling default-function-echo" do test "returns first arg" do assert 1 == NodeJS.call!("default-function-echo", [1])