Skip to content

Commit

Permalink
worker: allow sending huge messages
Browse files Browse the repository at this point in the history
When sending a payload, prefix it with its length. Then on the receiving
end, we read the length and the entire buffer in chunks.
  • Loading branch information
saghul committed Jul 15, 2024
1 parent 8f7e3e6 commit efe339e
Show file tree
Hide file tree
Showing 3 changed files with 139 additions and 14 deletions.
124 changes: 110 additions & 14 deletions src/worker.c
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
#include "private.h"
#include "tjs.h"

#include <string.h>
#include <unistd.h>

extern const uint8_t tjs__worker_bootstrap[];
Expand All @@ -46,12 +47,22 @@ typedef struct {
uv_stream_t stream;
uv_tcp_t tcp;
} h;
struct {
bool partial;
uint8_t *data;
size_t total_len;
size_t read_len;
} reading;
JSValue events[MSGPIPE_EVENT_MAX];
} TJSMessagePipe;

typedef struct {
uv_write_t req;
uint8_t *data;
union {
uint64_t u64;
uint8_t u8[8];
} data_size;
} TJSMessagePipeWriteReq;

static void uv__close_cb(uv_handle_t *handle) {
Expand Down Expand Up @@ -113,12 +124,31 @@ static void emit_msgpipe_event(TJSMessagePipe *p, int event, JSValue arg) {
CHECK_EQ(JS_EnqueueJob(ctx, emit_event, 2, (JSValue *) &args), 0);
}

static void tjs_msgpipe_read_object(TJSMessagePipe *p, const uint8_t *data, size_t len) {
CHECK_NOT_NULL(p);

JSContext *ctx = p->ctx;

int flags = JS_READ_OBJ_REFERENCE;
JSValue obj = JS_ReadObject(ctx, data, len, flags);
if (JS_IsException(obj))
emit_msgpipe_event(p, MSGPIPE_EVENT_MESSAGE_ERROR, JS_GetException(ctx));
else
emit_msgpipe_event(p, MSGPIPE_EVENT_MESSAGE, obj);
JS_FreeValue(ctx, obj);
}

static void uv__alloc_cb(uv_handle_t *handle, size_t suggested_size, uv_buf_t *buf) {
TJSMessagePipe *p = handle->data;
CHECK_NOT_NULL(p);

buf->base = js_malloc(p->ctx, suggested_size);
buf->len = suggested_size;
if (p->reading.partial) {
buf->base = (char *) p->reading.data + p->reading.read_len;
buf->len = p->reading.total_len - p->reading.read_len;
} else {
buf->base = js_malloc(p->ctx, suggested_size);
buf->len = suggested_size;
}
}

static void uv__read_cb(uv_stream_t *handle, ssize_t nread, const uv_buf_t *buf) {
Expand All @@ -129,7 +159,10 @@ static void uv__read_cb(uv_stream_t *handle, ssize_t nread, const uv_buf_t *buf)

if (nread < 0) {
uv_read_stop(&p->h.stream);
js_free(ctx, buf->base);
if (p->reading.partial)
js_free(ctx, p->reading.data);
else
js_free(ctx, buf->base);
if (nread != UV_EOF) {
JSValue error = tjs_new_error(ctx, nread);
emit_msgpipe_event(p, MSGPIPE_EVENT_MESSAGE_ERROR, error);
Expand All @@ -138,15 +171,76 @@ static void uv__read_cb(uv_stream_t *handle, ssize_t nread, const uv_buf_t *buf)
return;
}

// TODO: the entire object might not have come in a single packet.
int flags = JS_READ_OBJ_REFERENCE;
JSValue obj = JS_ReadObject(ctx, (const uint8_t *) buf->base, buf->len, flags);
if (JS_IsException(obj))
emit_msgpipe_event(p, MSGPIPE_EVENT_MESSAGE_ERROR, JS_GetException(ctx));
else
emit_msgpipe_event(p, MSGPIPE_EVENT_MESSAGE, obj);
JS_FreeValue(ctx, obj);
js_free(ctx, buf->base);
if (!p->reading.partial) {
union {
uint64_t u64;
uint8_t u8[8];
} u;
size_t len_size = sizeof(u.u8);

/* This is a bogus read, likely a zero-read. Just return the buffer. */
if (nread < len_size) {
js_free(ctx, buf->base);
return;
}

memcpy(u.u8, buf->base, len_size);
size_t total_len = u.u64;

/* Check if the total length exceeds what we got in this single read operation. */
if (total_len > nread - len_size) {
size_t read_len = nread - len_size;

p->reading.data = js_malloc(ctx, total_len);
if (!p->reading.data) {
js_free(ctx, buf->base);
return;
}

p->reading.partial = true;
p->reading.total_len = total_len;
p->reading.read_len = read_len;
memcpy(p->reading.data, buf->base + len_size, read_len);
js_free(ctx, buf->base);

return;
}

/* We managed to read the whole packet in a single read operation. */
tjs_msgpipe_read_object(p, (const uint8_t *) buf->base + len_size, total_len);
size_t remaining = nread - len_size - total_len;
if (remaining > 0) {
char *data = js_malloc(ctx, remaining);
if (!data) {
goto fail1;
}
memcpy(data, buf->base + len_size + total_len, remaining);
uv_buf_t b = uv_buf_init(data, remaining);
uv__read_cb(handle, remaining, &b);
}

fail1:;
js_free(ctx, buf->base);

return;
}

/* We are continuing a partial read. */
size_t remaining = p->reading.total_len - p->reading.read_len;
size_t chunk_size = remaining <= nread ? remaining : nread;
memcpy(p->reading.data + p->reading.read_len, buf->base, chunk_size);
p->reading.read_len += chunk_size;

if (p->reading.read_len < p->reading.total_len) {
/* We still need to read more. */

return;
}

/* We have a complete buffer now. */
tjs_msgpipe_read_object(p, (const uint8_t *) p->reading.data, p->reading.total_len);
js_free(ctx, p->reading.data);
memset(&p->reading, 0, sizeof(p->reading));
}

static JSValue tjs_new_msgpipe(JSContext *ctx, uv_os_sock_t fd) {
Expand Down Expand Up @@ -211,9 +305,11 @@ static JSValue tjs_msgpipe_postmessage(JSContext *ctx, JSValue this_val, int arg

wr->req.data = wr;
wr->data = buf;
wr->data_size.u64 = len;

uv_buf_t b = uv_buf_init((char *) buf, len);
int r = uv_write(&wr->req, &p->h.stream, &b, 1, uv__write_cb);
uv_buf_t bufs[2] = { uv_buf_init((char *) wr->data_size.u8, sizeof(wr->data_size.u8)),
uv_buf_init((char *) buf, len) };
int r = uv_write(&wr->req, &p->h.stream, bufs, 2, uv__write_cb);
if (r != 0) {
js_free(ctx, buf);
js_free(ctx, wr);
Expand Down
3 changes: 3 additions & 0 deletions tests/helpers/worker-echo.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,6 @@ addEventListener('message', function(e) {
postMessage(e.data);
});

addEventListener('messageerror', function(e) {
throw new Error(`Opps! ${e}`);
});
26 changes: 26 additions & 0 deletions tests/test-worker-large-payload.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
import assert from 'tjs:assert';
import path from 'tjs:path';


const data = {
x: new Array(65536).fill('x').join(''),
y: new Array(65536).fill('y').join(''),
z: 1234
};
const w = new Worker(path.join(import.meta.dirname, 'helpers', 'worker-echo.js'));
const timer = setTimeout(() => {
w.terminate();
assert.fail('Timeout out waiting for worker');
}, 10000);
w.onmessage = event => {
clearTimeout(timer);
const recvData = event.data;
assert.eq(data.x, recvData.x, 'Message received matches');
assert.eq(data.y, recvData.y, 'Message received matches');
assert.eq(data.z, recvData.z, 'Message received matches');
w.terminate();
};
w.onmessageerror = event => {
assert.fail(`Error receiving message from worker: ${event}`);
};
w.postMessage(data);

0 comments on commit efe339e

Please sign in to comment.