Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

worker: allow sending huge messages #607

Merged
merged 2 commits into from
Jul 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 59 additions & 8 deletions src/worker.c
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
#include "private.h"
#include "tjs.h"

#include <string.h>
#include <unistd.h>

extern const uint8_t tjs__worker_bootstrap[];
Expand All @@ -46,12 +47,24 @@ typedef struct {
uv_stream_t stream;
uv_tcp_t tcp;
} h;
struct {
union {
uint64_t u64;
uint8_t u8[8];
} total_size;
uint8_t *data;
uint64_t nread;
} reading;
JSValue events[MSGPIPE_EVENT_MAX];
} TJSMessagePipe;

typedef struct {
uv_write_t req;
uint8_t *data;
union {
uint64_t u64;
uint8_t u8[8];
} data_size;
} TJSMessagePipeWriteReq;

static void uv__close_cb(uv_handle_t *handle) {
Expand Down Expand Up @@ -117,8 +130,14 @@ static void uv__alloc_cb(uv_handle_t *handle, size_t suggested_size, uv_buf_t *b
TJSMessagePipe *p = handle->data;
CHECK_NOT_NULL(p);

buf->base = js_malloc(p->ctx, suggested_size);
buf->len = suggested_size;
if (p->reading.data) {
buf->base = (char *) p->reading.data + p->reading.nread;
uint64_t remaining = p->reading.total_size.u64 - p->reading.nread;
buf->len = remaining > suggested_size ? suggested_size : remaining;
} else {
buf->base = (char *) p->reading.total_size.u8;
buf->len = sizeof(p->reading.total_size.u8);
}
}

static void uv__read_cb(uv_stream_t *handle, ssize_t nread, const uv_buf_t *buf) {
Expand All @@ -129,7 +148,9 @@ static void uv__read_cb(uv_stream_t *handle, ssize_t nread, const uv_buf_t *buf)

if (nread < 0) {
uv_read_stop(&p->h.stream);
js_free(ctx, buf->base);
if (p->reading.data)
js_free(ctx, p->reading.data);
memset(&p->reading, 0, sizeof(p->reading));
if (nread != UV_EOF) {
JSValue error = tjs_new_error(ctx, nread);
emit_msgpipe_event(p, MSGPIPE_EVENT_MESSAGE_ERROR, error);
Expand All @@ -138,15 +159,43 @@ static void uv__read_cb(uv_stream_t *handle, ssize_t nread, const uv_buf_t *buf)
return;
}

// TODO: the entire object might not have come in a single packet.
if (!p->reading.data) {
size_t len_size = sizeof(p->reading.total_size.u8);

/* This is a bogus read, likely a zero-read. Just return the buffer. */
if (nread != len_size)
return;

uint64_t total_size = p->reading.total_size.u64;
CHECK_GE(total_size, 0);
p->reading.data = js_malloc(ctx, total_size);

return;
}

/* We are continuing a partial read. */
uint64_t total_size = p->reading.total_size.u64;
p->reading.nread += nread;

if (p->reading.nread < total_size) {
/* We still need to read more. */

return;
}

CHECK_EQ(p->reading.nread, total_size);

/* We have a complete buffer now. */
int flags = JS_READ_OBJ_REFERENCE;
JSValue obj = JS_ReadObject(ctx, (const uint8_t *) buf->base, buf->len, flags);
JSValue obj = JS_ReadObject(ctx, (const uint8_t *) p->reading.data, total_size, flags);
if (JS_IsException(obj))
emit_msgpipe_event(p, MSGPIPE_EVENT_MESSAGE_ERROR, JS_GetException(ctx));
else
emit_msgpipe_event(p, MSGPIPE_EVENT_MESSAGE, obj);
JS_FreeValue(ctx, obj);
js_free(ctx, buf->base);

js_free(ctx, p->reading.data);
memset(&p->reading, 0, sizeof(p->reading));
}

static JSValue tjs_new_msgpipe(JSContext *ctx, uv_os_sock_t fd) {
Expand Down Expand Up @@ -211,9 +260,11 @@ static JSValue tjs_msgpipe_postmessage(JSContext *ctx, JSValue this_val, int arg

wr->req.data = wr;
wr->data = buf;
wr->data_size.u64 = len;

uv_buf_t b = uv_buf_init((char *) buf, len);
int r = uv_write(&wr->req, &p->h.stream, &b, 1, uv__write_cb);
uv_buf_t bufs[2] = { uv_buf_init((char *) wr->data_size.u8, sizeof(wr->data_size.u8)),
uv_buf_init((char *) buf, len) };
int r = uv_write(&wr->req, &p->h.stream, bufs, 2, uv__write_cb);
if (r != 0) {
js_free(ctx, buf);
js_free(ctx, wr);
Expand Down
3 changes: 3 additions & 0 deletions tests/helpers/worker-echo.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,6 @@ addEventListener('message', function(e) {
postMessage(e.data);
});

addEventListener('messageerror', function(e) {
throw new Error(`Opps! ${e}`);
});
26 changes: 26 additions & 0 deletions tests/test-worker-large-payload.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
import assert from 'tjs:assert';
import path from 'tjs:path';


const data = {
x: new Array(65536).fill('x').join(''),
y: new Array(65536).fill('y').join(''),
z: 1234
};
const w = new Worker(path.join(import.meta.dirname, 'helpers', 'worker-echo.js'));
const timer = setTimeout(() => {
w.terminate();
assert.fail('Timeout out waiting for worker');
}, 10000);
w.onmessage = event => {
clearTimeout(timer);
const recvData = event.data;
assert.eq(data.x, recvData.x, 'Message received matches');
assert.eq(data.y, recvData.y, 'Message received matches');
assert.eq(data.z, recvData.z, 'Message received matches');
w.terminate();
};
w.onmessageerror = event => {
assert.fail(`Error receiving message from worker: ${event}`);
};
w.postMessage(data);
Loading